diff --git a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/AbstractJobFacade.java b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/AbstractJobFacade.java new file mode 100644 index 0000000000..cee06ed49d --- /dev/null +++ b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/AbstractJobFacade.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.elasticjob.kernel.executor.facade; + +import com.google.common.base.Strings; + +import java.util.Collection; +import java.util.Comparator; +import java.util.stream.Collectors; + +import org.apache.shardingsphere.elasticjob.api.JobConfiguration; +import org.apache.shardingsphere.elasticjob.kernel.infra.exception.JobExecutionEnvironmentException; +import org.apache.shardingsphere.elasticjob.kernel.internal.config.ConfigurationService; +import org.apache.shardingsphere.elasticjob.kernel.internal.context.TaskContext; +import org.apache.shardingsphere.elasticjob.kernel.internal.failover.FailoverService; +import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionContextService; +import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionService; +import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ShardingService; +import org.apache.shardingsphere.elasticjob.kernel.tracing.config.TracingConfiguration; +import org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobTracingEventBus; +import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; +import org.apache.shardingsphere.elasticjob.spi.executor.item.param.JobRuntimeService; +import org.apache.shardingsphere.elasticjob.spi.listener.ElasticJobListener; +import org.apache.shardingsphere.elasticjob.spi.listener.param.ShardingContexts; +import org.apache.shardingsphere.elasticjob.spi.tracing.event.JobExecutionEvent; +import org.apache.shardingsphere.elasticjob.spi.tracing.event.JobStatusTraceEvent; +import org.apache.shardingsphere.elasticjob.spi.tracing.event.JobStatusTraceEvent.State; + +import lombok.extern.slf4j.Slf4j; + +/** + * Abstract Job facade. + */ +@Slf4j +abstract class AbstractJobFacade implements JobFacade { + + protected final ConfigurationService configService; + + protected final ShardingService shardingService; + + protected final ExecutionContextService executionContextService; + + protected final ExecutionService executionService; + + protected final FailoverService failoverService; + + protected final Collection elasticJobListeners; + + protected final JobTracingEventBus jobTracingEventBus; + + public AbstractJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final Collection elasticJobListeners, final TracingConfiguration tracingConfig) { + configService = new ConfigurationService(regCenter, jobName); + shardingService = new ShardingService(regCenter, jobName); + executionContextService = new ExecutionContextService(regCenter, jobName); + executionService = new ExecutionService(regCenter, jobName); + failoverService = new FailoverService(regCenter, jobName); + this.elasticJobListeners = elasticJobListeners.stream().sorted(Comparator.comparingInt(ElasticJobListener::order)).collect(Collectors.toList()); + this.jobTracingEventBus = null == tracingConfig ? new JobTracingEventBus() : new JobTracingEventBus(tracingConfig); + } + + /** + * Load job configuration. + * + * @param fromCache load from cache or not + * @return job configuration + */ + @Override + public JobConfiguration loadJobConfiguration(final boolean fromCache) { + return configService.load(fromCache); + } + + /** + * Check job execution environment. + * + * @throws org.apache.shardingsphere.elasticjob.kernel.infra.exception.JobExecutionEnvironmentException job execution environment exception + */ + @Override + public void checkJobExecutionEnvironment() throws JobExecutionEnvironmentException { + configService.checkMaxTimeDiffSecondsTolerable(); + } + + /** + * Failover If necessary. + */ + @Override + public void failoverIfNecessary() { + if (configService.load(true).isFailover()) { + failoverService.failoverIfNecessary(); + } + } + + /** + * Register job begin. + * + * @param shardingContexts sharding contexts + */ + @Override + public void registerJobBegin(final ShardingContexts shardingContexts) { + executionService.registerJobBegin(shardingContexts); + } + + /** + * Register job completed. + * + * @param shardingContexts sharding contexts + */ + @Override + public void registerJobCompleted(final ShardingContexts shardingContexts) { + executionService.registerJobCompleted(shardingContexts); + if (configService.load(true).isFailover()) { + failoverService.updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet()); + } + } + + public abstract ShardingContexts getShardingContexts(); + + /** + * Set task misfire flag. + * + * @param shardingItems sharding items to be set misfire flag + * @return whether satisfy misfire condition + */ + @Override + public boolean misfireIfRunning(final Collection shardingItems) { + return executionService.misfireIfHasRunningItems(shardingItems); + } + + /** + * Clear misfire flag. + * + * @param shardingItems sharding items to be cleared misfire flag + */ + @Override + public void clearMisfire(final Collection shardingItems) { + executionService.clearMisfire(shardingItems); + } + + /** + * Judge job whether to need to execute misfire tasks. + * + * @param shardingItems sharding items + * @return need to execute misfire tasks or not + */ + @Override + public boolean isExecuteMisfired(final Collection shardingItems) { + return configService.load(true).isMisfire() && !isNeedSharding() && !executionService.getMisfiredJobItems(shardingItems).isEmpty(); + } + + /** + * Judge job whether to need resharding. + * + * @return need resharding or not + */ + @Override + public boolean isNeedSharding() { + return shardingService.isNeedSharding(); + } + + /** + * Call before job executed. + * + * @param shardingContexts sharding contexts + */ + @Override + public void beforeJobExecuted(final ShardingContexts shardingContexts) { + for (ElasticJobListener each : elasticJobListeners) { + each.beforeJobExecuted(shardingContexts); + } + } + + /** + * Call after job executed. + * + * @param shardingContexts sharding contexts + */ + @Override + public void afterJobExecuted(final ShardingContexts shardingContexts) { + for (ElasticJobListener each : elasticJobListeners) { + each.afterJobExecuted(shardingContexts); + } + } + + /** + * Post job execution event. + * + * @param jobExecutionEvent job execution event + */ + @Override + public void postJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) { + jobTracingEventBus.post(jobExecutionEvent); + } + + /** + * Post job status trace event. + * + * @param taskId task Id + * @param state job state + * @param message job message + */ + @Override + public void postJobStatusTraceEvent(final String taskId, final State state, final String message) { + TaskContext taskContext = TaskContext.from(taskId); + jobTracingEventBus.post(new JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(), + taskContext.getSlaveId(), taskContext.getType(), taskContext.getMetaInfo().getShardingItems().toString(), state, message)); + if (!Strings.isNullOrEmpty(message)) { + log.trace(message); + } + } + + /** + * Get job runtime service. + * + * @return job runtime service + */ + @Override + public JobRuntimeService getJobRuntimeService() { + return new JobJobRuntimeServiceImpl(this); + } +} diff --git a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/JobFacade.java b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/JobFacade.java index 7c6eaf040d..2dbb774024 100644 --- a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/JobFacade.java +++ b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/JobFacade.java @@ -17,131 +17,60 @@ package org.apache.shardingsphere.elasticjob.kernel.executor.facade; -import com.google.common.base.Strings; -import lombok.extern.slf4j.Slf4j; +import java.util.Collection; + import org.apache.shardingsphere.elasticjob.api.JobConfiguration; import org.apache.shardingsphere.elasticjob.kernel.infra.exception.JobExecutionEnvironmentException; -import org.apache.shardingsphere.elasticjob.spi.listener.ElasticJobListener; -import org.apache.shardingsphere.elasticjob.spi.listener.param.ShardingContexts; -import org.apache.shardingsphere.elasticjob.kernel.internal.config.ConfigurationService; -import org.apache.shardingsphere.elasticjob.kernel.internal.context.TaskContext; -import org.apache.shardingsphere.elasticjob.kernel.internal.failover.FailoverService; -import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionContextService; -import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionService; -import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ShardingService; -import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; import org.apache.shardingsphere.elasticjob.spi.executor.item.param.JobRuntimeService; -import org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobTracingEventBus; -import org.apache.shardingsphere.elasticjob.kernel.tracing.config.TracingConfiguration; +import org.apache.shardingsphere.elasticjob.spi.listener.param.ShardingContexts; import org.apache.shardingsphere.elasticjob.spi.tracing.event.JobExecutionEvent; import org.apache.shardingsphere.elasticjob.spi.tracing.event.JobStatusTraceEvent; -import org.apache.shardingsphere.elasticjob.spi.tracing.event.JobStatusTraceEvent.State; - -import java.util.Collection; -import java.util.Comparator; -import java.util.List; -import java.util.stream.Collectors; /** * Job facade. */ -@Slf4j -public final class JobFacade { - - private final ConfigurationService configService; - - private final ShardingService shardingService; - - private final ExecutionContextService executionContextService; - - private final ExecutionService executionService; - - private final FailoverService failoverService; - - private final Collection elasticJobListeners; - - private final JobTracingEventBus jobTracingEventBus; - - public JobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final Collection elasticJobListeners, final TracingConfiguration tracingConfig) { - configService = new ConfigurationService(regCenter, jobName); - shardingService = new ShardingService(regCenter, jobName); - executionContextService = new ExecutionContextService(regCenter, jobName); - executionService = new ExecutionService(regCenter, jobName); - failoverService = new FailoverService(regCenter, jobName); - this.elasticJobListeners = elasticJobListeners.stream().sorted(Comparator.comparingInt(ElasticJobListener::order)).collect(Collectors.toList()); - this.jobTracingEventBus = null == tracingConfig ? new JobTracingEventBus() : new JobTracingEventBus(tracingConfig); - } +public interface JobFacade { /** * Load job configuration. - * + * * @param fromCache load from cache or not * @return job configuration */ - public JobConfiguration loadJobConfiguration(final boolean fromCache) { - return configService.load(fromCache); - } + JobConfiguration loadJobConfiguration(final boolean fromCache); /** * Check job execution environment. - * + * * @throws JobExecutionEnvironmentException job execution environment exception */ - public void checkJobExecutionEnvironment() throws JobExecutionEnvironmentException { - configService.checkMaxTimeDiffSecondsTolerable(); - } + void checkJobExecutionEnvironment() throws JobExecutionEnvironmentException; /** * Failover If necessary. */ - public void failoverIfNecessary() { - if (configService.load(true).isFailover()) { - failoverService.failoverIfNecessary(); - } - } + void failoverIfNecessary(); /** * Register job begin. * * @param shardingContexts sharding contexts */ - public void registerJobBegin(final ShardingContexts shardingContexts) { - executionService.registerJobBegin(shardingContexts); - } + void registerJobBegin(ShardingContexts shardingContexts); /** * Register job completed. * * @param shardingContexts sharding contexts */ - public void registerJobCompleted(final ShardingContexts shardingContexts) { - executionService.registerJobCompleted(shardingContexts); - if (configService.load(true).isFailover()) { - failoverService.updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet()); - } - } + void registerJobCompleted(ShardingContexts shardingContexts); /** * Get sharding contexts. * * @return sharding contexts */ - public ShardingContexts getShardingContexts() { - boolean isFailover = configService.load(true).isFailover(); - if (isFailover) { - List failoverShardingItems = failoverService.getLocalFailoverItems(); - if (!failoverShardingItems.isEmpty()) { - return executionContextService.getJobShardingContext(failoverShardingItems); - } - } - shardingService.shardingIfNecessary(); - List shardingItems = shardingService.getLocalShardingItems(); - if (isFailover) { - shardingItems.removeAll(failoverService.getLocalTakeOffItems()); - } - shardingItems.removeAll(executionService.getDisabledItems(shardingItems)); - return executionContextService.getJobShardingContext(shardingItems); - } + ShardingContexts getShardingContexts(); /** * Set task misfire flag. @@ -149,68 +78,50 @@ public ShardingContexts getShardingContexts() { * @param shardingItems sharding items to be set misfire flag * @return whether satisfy misfire condition */ - public boolean misfireIfRunning(final Collection shardingItems) { - return executionService.misfireIfHasRunningItems(shardingItems); - } + boolean misfireIfRunning(Collection shardingItems); /** * Clear misfire flag. * * @param shardingItems sharding items to be cleared misfire flag */ - public void clearMisfire(final Collection shardingItems) { - executionService.clearMisfire(shardingItems); - } + void clearMisfire(Collection shardingItems); /** - * Judge job whether to need to execute misfire tasks. - * + * Judge job whether need to execute misfire tasks. + * * @param shardingItems sharding items - * @return need to execute misfire tasks or not + * @return whether need to execute misfire tasks */ - public boolean isExecuteMisfired(final Collection shardingItems) { - return configService.load(true).isMisfire() && !isNeedSharding() && !executionService.getMisfiredJobItems(shardingItems).isEmpty(); - } + boolean isExecuteMisfired(Collection shardingItems); /** - * Judge job whether to need resharding. + * Judge job whether need resharding. * - * @return need resharding or not + * @return whether need resharding */ - public boolean isNeedSharding() { - return shardingService.isNeedSharding(); - } + boolean isNeedSharding(); /** * Call before job executed. * * @param shardingContexts sharding contexts */ - public void beforeJobExecuted(final ShardingContexts shardingContexts) { - for (ElasticJobListener each : elasticJobListeners) { - each.beforeJobExecuted(shardingContexts); - } - } + void beforeJobExecuted(ShardingContexts shardingContexts); /** * Call after job executed. * * @param shardingContexts sharding contexts */ - public void afterJobExecuted(final ShardingContexts shardingContexts) { - for (ElasticJobListener each : elasticJobListeners) { - each.afterJobExecuted(shardingContexts); - } - } + void afterJobExecuted(ShardingContexts shardingContexts); /** * Post job execution event. * * @param jobExecutionEvent job execution event */ - public void postJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) { - jobTracingEventBus.post(jobExecutionEvent); - } + void postJobExecutionEvent(final JobExecutionEvent jobExecutionEvent); /** * Post job status trace event. @@ -219,21 +130,12 @@ public void postJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) { * @param state job state * @param message job message */ - public void postJobStatusTraceEvent(final String taskId, final State state, final String message) { - TaskContext taskContext = TaskContext.from(taskId); - jobTracingEventBus.post(new JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(), - taskContext.getSlaveId(), taskContext.getType(), taskContext.getMetaInfo().getShardingItems().toString(), state, message)); - if (!Strings.isNullOrEmpty(message)) { - log.trace(message); - } - } - + void postJobStatusTraceEvent(String taskId, JobStatusTraceEvent.State state, String message); + /** * Get job runtime service. * * @return job runtime service */ - public JobRuntimeService getJobRuntimeService() { - return new JobJobRuntimeServiceImpl(this); - } + JobRuntimeService getJobRuntimeService(); } diff --git a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/ShardingJobFacade.java b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/ShardingJobFacade.java new file mode 100644 index 0000000000..040fd3e44c --- /dev/null +++ b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/ShardingJobFacade.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.elasticjob.kernel.executor.facade; + +import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.elasticjob.spi.listener.ElasticJobListener; +import org.apache.shardingsphere.elasticjob.spi.listener.param.ShardingContexts; +import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; +import org.apache.shardingsphere.elasticjob.kernel.tracing.config.TracingConfiguration; + +import java.util.Collection; +import java.util.List; + +/** + * Sharding Job facade. + */ +@Slf4j +public final class ShardingJobFacade extends AbstractJobFacade { + + public ShardingJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final Collection elasticJobListeners, final TracingConfiguration tracingConfig) { + super(regCenter, jobName, elasticJobListeners, tracingConfig); + } + + /** + * Get sharding contexts. + * + * @return sharding contexts + */ + @Override + public ShardingContexts getShardingContexts() { + boolean isFailover = configService.load(true).isFailover(); + if (isFailover) { + List failoverShardingItems = failoverService.getLocalFailoverItems(); + if (!failoverShardingItems.isEmpty()) { + return executionContextService.getJobShardingContext(failoverShardingItems); + } + } + shardingService.shardingIfNecessary(); + List shardingItems = shardingService.getLocalShardingItems(); + if (isFailover) { + shardingItems.removeAll(failoverService.getLocalTakeOffItems()); + } + shardingItems.removeAll(executionService.getDisabledItems(shardingItems)); + return executionContextService.getJobShardingContext(shardingItems); + } + +} diff --git a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/SingleShardingJobFacade.java b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/SingleShardingJobFacade.java new file mode 100644 index 0000000000..44e5c01446 --- /dev/null +++ b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/SingleShardingJobFacade.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.elasticjob.kernel.executor.facade; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.lang3.StringUtils; +import org.apache.shardingsphere.elasticjob.api.JobConfiguration; +import org.apache.shardingsphere.elasticjob.kernel.internal.instance.InstanceService; +import org.apache.shardingsphere.elasticjob.kernel.internal.schedule.JobRegistry; +import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.JobInstance; +import org.apache.shardingsphere.elasticjob.kernel.internal.storage.JobNodeStorage; +import org.apache.shardingsphere.elasticjob.kernel.tracing.config.TracingConfiguration; +import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; +import org.apache.shardingsphere.elasticjob.spi.listener.ElasticJobListener; +import org.apache.shardingsphere.elasticjob.spi.listener.param.ShardingContexts; + +import lombok.extern.slf4j.Slf4j; + +/** + * Single Sharding Job facade. + */ +@Slf4j +public final class SingleShardingJobFacade extends AbstractJobFacade { + + private final JobNodeStorage jobNodeStorage; + private final InstanceService instanceService; + + public SingleShardingJobFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final Collection elasticJobListeners, final TracingConfiguration tracingConfig) { + super(regCenter, jobName, elasticJobListeners, tracingConfig); + + jobNodeStorage = new JobNodeStorage(regCenter, jobName); + instanceService = new InstanceService(regCenter, jobName); + } + + @Override + public void registerJobCompleted(final ShardingContexts shardingContexts) { + super.registerJobCompleted(shardingContexts); + + JobConfiguration jobConfig = configService.load(true); + JobInstance jobInst = JobRegistry.getInstance().getJobInstance(jobConfig.getJobName()); + if (null == jobInst) { + log.warn("Error! Can't find the job instance with name:{}", jobConfig.getJobName()); + return; + } + Integer nextIndex = null; + List availJobInst = instanceService.getAvailableJobInstances(); + for (int i = 0; i < availJobInst.size(); i++) { + JobInstance temp = availJobInst.get(i); + if (temp.getServerIp().equals(jobInst.getServerIp())) { + nextIndex = i + 1; // find the current running job instance, and set next one to current index + 1 + break; + } + } + if (nextIndex != null) { // the normal case that can find the next index, exclude the bounded scenarios + nextIndex = nextIndex >= availJobInst.size() ? 0 : nextIndex; // Round Robin Loop + jobNodeStorage.fillEphemeralJobNode("next-job-instance-ip", availJobInst.get(nextIndex).getServerIp()); + } + + if (log.isDebugEnabled()) { + log.debug("job name: {}, next index: {}, sharding total count: {}", + jobConfig.getJobName(), nextIndex, jobConfig.getShardingTotalCount()); + } + } + + /** + * Get sharding contexts. + * + * @return sharding contexts + */ + @Override + public ShardingContexts getShardingContexts() { + JobConfiguration jobConfig = configService.load(true); + boolean isFailover = jobConfig.isFailover(); + if (isFailover) { + List failoverShardingItems = failoverService.getLocalFailoverItems(); + if (!failoverShardingItems.isEmpty()) { + return executionContextService.getJobShardingContext(failoverShardingItems); + } + } + + List shardingItems; + String nextJobInstIP = null; + if (isNeedSharding()) { // the first initialization or reconcile case + shardingService.shardingIfNecessary(); + shardingItems = shardingService.getLocalShardingItems(); + } else { + nextJobInstIP = jobNodeStorage.getJobNodeDataDirectly("next-job-instance-ip"); + if (StringUtils.isBlank(nextJobInstIP)) { // if there is no next job instance ip + shardingService.shardingIfNecessary(); + shardingItems = shardingService.getLocalShardingItems(); + } else { // when next job instance is specified under normal case + JobInstance jobInst = JobRegistry.getInstance().getJobInstance(jobConfig.getJobName()); + shardingItems = nextJobInstIP.equals(jobInst.getServerIp()) ? Collections.singletonList(0) : new ArrayList<>(); + } + } + if (log.isDebugEnabled()) { + log.debug("job name: {}, sharding items: {}, nextJobInstIP: {}, sharding total count: {}, isFailover: {}", + jobConfig.getJobName(), shardingItems, nextJobInstIP, jobConfig.getShardingTotalCount(), isFailover); + } + + if (isFailover) { + shardingItems.removeAll(failoverService.getLocalTakeOffItems()); + } + shardingItems.removeAll(executionService.getDisabledItems(shardingItems)); + return executionContextService.getJobShardingContext(shardingItems); + } + +} diff --git a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/JobScheduler.java b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/JobScheduler.java index 0311b6fc6f..29bf9ea9e8 100644 --- a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/JobScheduler.java +++ b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/JobScheduler.java @@ -22,9 +22,11 @@ import lombok.Getter; import org.apache.shardingsphere.elasticjob.api.ElasticJob; import org.apache.shardingsphere.elasticjob.api.JobConfiguration; +import org.apache.shardingsphere.elasticjob.kernel.executor.facade.SingleShardingJobFacade; import org.apache.shardingsphere.elasticjob.spi.executor.error.handler.JobErrorHandlerPropertiesValidator; import org.apache.shardingsphere.elasticjob.kernel.executor.ElasticJobExecutor; import org.apache.shardingsphere.elasticjob.kernel.infra.exception.JobSystemException; +import org.apache.shardingsphere.elasticjob.kernel.executor.facade.ShardingJobFacade; import org.apache.shardingsphere.elasticjob.kernel.executor.facade.JobFacade; import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.JobInstance; import org.apache.shardingsphere.elasticjob.spi.listener.ElasticJobListener; @@ -80,7 +82,14 @@ public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob Collection jobListeners = getElasticJobListeners(this.jobConfig); setUpFacade = new SetUpFacade(regCenter, this.jobConfig.getJobName(), jobListeners); schedulerFacade = new SchedulerFacade(regCenter, this.jobConfig.getJobName()); - jobFacade = new JobFacade(regCenter, this.jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null)); + + if (1 == this.jobConfig.getShardingTotalCount() // the single sharding scenario + && "SINGLE_SHARDING_BALANCE".equals(this.jobConfig.getJobShardingStrategyType())) { // the specified SINGLE_SHARDING_BALANCE strategy + jobFacade = new SingleShardingJobFacade(regCenter, this.jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null)); + } else { + jobFacade = new ShardingJobFacade(regCenter, this.jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null)); + } + validateJobProperties(); jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig, jobFacade); setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners); @@ -94,7 +103,14 @@ public JobScheduler(final CoordinatorRegistryCenter regCenter, final String elas Collection jobListeners = getElasticJobListeners(this.jobConfig); setUpFacade = new SetUpFacade(regCenter, this.jobConfig.getJobName(), jobListeners); schedulerFacade = new SchedulerFacade(regCenter, this.jobConfig.getJobName()); - jobFacade = new JobFacade(regCenter, this.jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null)); + + if (1 == this.jobConfig.getShardingTotalCount() // the single sharding scenario + && "SINGLE_SHARDING_BALANCE".equals(this.jobConfig.getJobShardingStrategyType())) { // the specified SINGLE_SHARDING_BALANCE strategy + jobFacade = new SingleShardingJobFacade(regCenter, this.jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null)); + } else { + jobFacade = new ShardingJobFacade(regCenter, this.jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null)); + } + validateJobProperties(); jobExecutor = new ElasticJobExecutor(elasticJobType, this.jobConfig, jobFacade); setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners); diff --git a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/sharding/strategy/type/SingleShardingBalanceJobShardingStrategy.java b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/sharding/strategy/type/SingleShardingBalanceJobShardingStrategy.java new file mode 100644 index 0000000000..3be60fa93d --- /dev/null +++ b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/sharding/strategy/type/SingleShardingBalanceJobShardingStrategy.java @@ -0,0 +1,47 @@ +package org.apache.shardingsphere.elasticjob.kernel.internal.sharding.strategy.type; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.JobInstance; +import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.strategy.JobShardingStrategy; + +/** + * Single sharding Balance strategy, referenced of ROUND_ROBIN strategy. + *
+ * it resolves the problem which ROUND_ROBIN is stick with the certain one job instance
+ * for the hashcode of job name is a constant value. while with SINGLE_SHARDING_BALANCE, it allows
+ * the job running on all the job instances each one by one, just like loop the job instances.
+ *
+ * this is the real round robin balance job running in the job instance dimension.
+ * 
+ * + * @author hongzhu + * @version V1.0 + * @since 2024-12-03 19:19 + */ +public class SingleShardingBalanceJobShardingStrategy implements JobShardingStrategy { + + private final AverageAllocationJobShardingStrategy averageAllocationJobShardingStrategy = new AverageAllocationJobShardingStrategy(); + + @Override + public Map> sharding(final List jobInstances, final String jobName, final int shardingTotalCount) { + int shardingUnitsSize = jobInstances.size(); + int offset = Math.abs(jobName.hashCode() + ((Long)System.currentTimeMillis()).intValue()) % shardingUnitsSize; + + List result = new ArrayList<>(shardingUnitsSize); + for (int i = 0; i < shardingUnitsSize; i++) { + int index = (i + offset) % shardingUnitsSize; + result.add(jobInstances.get(index)); + } + + return averageAllocationJobShardingStrategy.sharding(result, jobName, shardingTotalCount); + } + + @Override + public String getType() { + return "SINGLE_SHARDING_BALANCE"; + } + +} diff --git a/kernel/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.kernel.internal.sharding.strategy.JobShardingStrategy b/kernel/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.kernel.internal.sharding.strategy.JobShardingStrategy index 16310868c2..f1fa156d0d 100644 --- a/kernel/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.kernel.internal.sharding.strategy.JobShardingStrategy +++ b/kernel/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.kernel.internal.sharding.strategy.JobShardingStrategy @@ -18,3 +18,4 @@ org.apache.shardingsphere.elasticjob.kernel.internal.sharding.strategy.type.AverageAllocationJobShardingStrategy org.apache.shardingsphere.elasticjob.kernel.internal.sharding.strategy.type.OdevitySortByNameJobShardingStrategy org.apache.shardingsphere.elasticjob.kernel.internal.sharding.strategy.type.RoundRobinByNameJobShardingStrategy +org.apache.shardingsphere.elasticjob.kernel.internal.sharding.strategy.type.SingleShardingBalanceJobShardingStrategy diff --git a/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/JobFacadeTest.java b/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/ShardingJobFacadeTest.java similarity index 81% rename from kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/JobFacadeTest.java rename to kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/ShardingJobFacadeTest.java index 8806af610f..61bde8f032 100644 --- a/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/JobFacadeTest.java +++ b/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/ShardingJobFacadeTest.java @@ -46,7 +46,7 @@ import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) -class JobFacadeTest { +class ShardingJobFacadeTest { @Mock private ConfigurationService configService; @@ -69,54 +69,54 @@ class JobFacadeTest { @Mock private ElasticJobListenerCaller caller; - private JobFacade jobFacade; + private ShardingJobFacade shardingJobFacade; private StringBuilder orderResult; @BeforeEach void setUp() { orderResult = new StringBuilder(); - jobFacade = new JobFacade(null, "test_job", + shardingJobFacade = new ShardingJobFacade(null, "test_job", Arrays.asList(new TestElasticJobListener(caller, "l1", 2, orderResult), new TestElasticJobListener(caller, "l2", 1, orderResult)), null); - ReflectionUtils.setFieldValue(jobFacade, "configService", configService); - ReflectionUtils.setFieldValue(jobFacade, "shardingService", shardingService); - ReflectionUtils.setFieldValue(jobFacade, "executionContextService", executionContextService); - ReflectionUtils.setFieldValue(jobFacade, "executionService", executionService); - ReflectionUtils.setFieldValue(jobFacade, "failoverService", failoverService); - ReflectionUtils.setFieldValue(jobFacade, "jobTracingEventBus", jobTracingEventBus); + ReflectionUtils.setSuperclassFieldValue(shardingJobFacade, "configService", configService); + ReflectionUtils.setSuperclassFieldValue(shardingJobFacade, "shardingService", shardingService); + ReflectionUtils.setSuperclassFieldValue(shardingJobFacade, "executionContextService", executionContextService); + ReflectionUtils.setSuperclassFieldValue(shardingJobFacade, "executionService", executionService); + ReflectionUtils.setSuperclassFieldValue(shardingJobFacade, "failoverService", failoverService); + ReflectionUtils.setSuperclassFieldValue(shardingJobFacade, "jobTracingEventBus", jobTracingEventBus); } @Test void assertLoad() { JobConfiguration expected = JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").build(); when(configService.load(true)).thenReturn(expected); - assertThat(jobFacade.loadJobConfiguration(true), is(expected)); + assertThat(shardingJobFacade.loadJobConfiguration(true), is(expected)); } @Test void assertCheckMaxTimeDiffSecondsTolerable() throws JobExecutionEnvironmentException { - jobFacade.checkJobExecutionEnvironment(); + shardingJobFacade.checkJobExecutionEnvironment(); verify(configService).checkMaxTimeDiffSecondsTolerable(); } @Test void assertFailoverIfUnnecessary() { when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(false).build()); - jobFacade.failoverIfNecessary(); + shardingJobFacade.failoverIfNecessary(); verify(failoverService, times(0)).failoverIfNecessary(); } @Test void assertFailoverIfNecessary() { when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(true).monitorExecution(true).build()); - jobFacade.failoverIfNecessary(); + shardingJobFacade.failoverIfNecessary(); verify(failoverService).failoverIfNecessary(); } @Test void assertRegisterJobBegin() { ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 10, "", Collections.emptyMap()); - jobFacade.registerJobBegin(shardingContexts); + shardingJobFacade.registerJobBegin(shardingContexts); verify(executionService).registerJobBegin(shardingContexts); } @@ -124,7 +124,7 @@ void assertRegisterJobBegin() { void assertRegisterJobCompletedWhenFailoverDisabled() { ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 10, "", Collections.emptyMap()); when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(false).build()); - jobFacade.registerJobCompleted(shardingContexts); + shardingJobFacade.registerJobCompleted(shardingContexts); verify(executionService).registerJobCompleted(shardingContexts); verify(failoverService, times(0)).updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet()); } @@ -133,7 +133,7 @@ void assertRegisterJobCompletedWhenFailoverDisabled() { void assertRegisterJobCompletedWhenFailoverEnabled() { ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 10, "", Collections.emptyMap()); when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(true).monitorExecution(true).build()); - jobFacade.registerJobCompleted(shardingContexts); + shardingJobFacade.registerJobCompleted(shardingContexts); verify(executionService).registerJobCompleted(shardingContexts); verify(failoverService).updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet()); } @@ -144,7 +144,7 @@ void assertGetShardingContextWhenIsFailoverEnableAndFailover() { when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(true).monitorExecution(true).build()); when(failoverService.getLocalFailoverItems()).thenReturn(Collections.singletonList(1)); when(executionContextService.getJobShardingContext(Collections.singletonList(1))).thenReturn(shardingContexts); - assertThat(jobFacade.getShardingContexts(), is(shardingContexts)); + assertThat(shardingJobFacade.getShardingContexts(), is(shardingContexts)); verify(shardingService, times(0)).shardingIfNecessary(); } @@ -156,7 +156,7 @@ void assertGetShardingContextWhenIsFailoverEnableAndNotFailover() { when(shardingService.getLocalShardingItems()).thenReturn(Lists.newArrayList(0, 1)); when(failoverService.getLocalTakeOffItems()).thenReturn(Collections.singletonList(0)); when(executionContextService.getJobShardingContext(Collections.singletonList(1))).thenReturn(shardingContexts); - assertThat(jobFacade.getShardingContexts(), is(shardingContexts)); + assertThat(shardingJobFacade.getShardingContexts(), is(shardingContexts)); verify(shardingService).shardingIfNecessary(); } @@ -166,7 +166,7 @@ void assertGetShardingContextWhenIsFailoverDisable() { when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 3).cron("0/1 * * * * ?").failover(false).build()); when(shardingService.getLocalShardingItems()).thenReturn(Arrays.asList(0, 1)); when(executionContextService.getJobShardingContext(Arrays.asList(0, 1))).thenReturn(shardingContexts); - assertThat(jobFacade.getShardingContexts(), is(shardingContexts)); + assertThat(shardingJobFacade.getShardingContexts(), is(shardingContexts)); verify(shardingService).shardingIfNecessary(); } @@ -177,45 +177,45 @@ void assertGetShardingContextWhenHasDisabledItems() { when(shardingService.getLocalShardingItems()).thenReturn(Lists.newArrayList(0, 1)); when(executionService.getDisabledItems(Arrays.asList(0, 1))).thenReturn(Collections.singletonList(1)); when(executionContextService.getJobShardingContext(Collections.singletonList(0))).thenReturn(shardingContexts); - assertThat(jobFacade.getShardingContexts(), is(shardingContexts)); + assertThat(shardingJobFacade.getShardingContexts(), is(shardingContexts)); verify(shardingService).shardingIfNecessary(); } @Test void assertMisfireIfRunning() { when(executionService.misfireIfHasRunningItems(Arrays.asList(0, 1))).thenReturn(true); - assertThat(jobFacade.misfireIfRunning(Arrays.asList(0, 1)), is(true)); + assertThat(shardingJobFacade.misfireIfRunning(Arrays.asList(0, 1)), is(true)); } @Test void assertClearMisfire() { - jobFacade.clearMisfire(Arrays.asList(0, 1)); + shardingJobFacade.clearMisfire(Arrays.asList(0, 1)); verify(executionService).clearMisfire(Arrays.asList(0, 1)); } @Test void assertIsNeedSharding() { when(shardingService.isNeedSharding()).thenReturn(true); - assertThat(jobFacade.isNeedSharding(), is(true)); + assertThat(shardingJobFacade.isNeedSharding(), is(true)); } @Test void assertBeforeJobExecuted() { - jobFacade.beforeJobExecuted(new ShardingContexts("fake_task_id", "test_job", 10, "", Collections.emptyMap())); + shardingJobFacade.beforeJobExecuted(new ShardingContexts("fake_task_id", "test_job", 10, "", Collections.emptyMap())); verify(caller, times(2)).before(); assertThat(orderResult.toString(), is("l2l1")); } @Test void assertAfterJobExecuted() { - jobFacade.afterJobExecuted(new ShardingContexts("fake_task_id", "test_job", 10, "", Collections.emptyMap())); + shardingJobFacade.afterJobExecuted(new ShardingContexts("fake_task_id", "test_job", 10, "", Collections.emptyMap())); verify(caller, times(2)).after(); assertThat(orderResult.toString(), is("l2l1")); } @Test void assertPostJobExecutionEvent() { - jobFacade.postJobExecutionEvent(null); + shardingJobFacade.postJobExecutionEvent(null); verify(jobTracingEventBus).post(null); } } diff --git a/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/SingleShardingJobFacadeTest.java b/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/SingleShardingJobFacadeTest.java new file mode 100644 index 0000000000..dc919c360c --- /dev/null +++ b/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/SingleShardingJobFacadeTest.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.elasticjob.kernel.executor.facade; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.Lists; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.shardingsphere.elasticjob.api.JobConfiguration; +import org.apache.shardingsphere.elasticjob.kernel.infra.exception.JobExecutionEnvironmentException; +import org.apache.shardingsphere.elasticjob.kernel.internal.config.ConfigurationService; +import org.apache.shardingsphere.elasticjob.kernel.internal.failover.FailoverService; +import org.apache.shardingsphere.elasticjob.kernel.internal.instance.InstanceService; +import org.apache.shardingsphere.elasticjob.kernel.internal.schedule.JobRegistry; +import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionContextService; +import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionService; +import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.JobInstance; +import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ShardingService; +import org.apache.shardingsphere.elasticjob.kernel.internal.storage.JobNodeStorage; +import org.apache.shardingsphere.elasticjob.kernel.listener.fixture.ElasticJobListenerCaller; +import org.apache.shardingsphere.elasticjob.kernel.listener.fixture.TestElasticJobListener; +import org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobTracingEventBus; +import org.apache.shardingsphere.elasticjob.spi.listener.param.ShardingContexts; +import org.apache.shardingsphere.elasticjob.test.util.ReflectionUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class SingleShardingJobFacadeTest { + + @Mock + private ConfigurationService configService; + + @Mock + private ShardingService shardingService; + + @Mock + private ExecutionContextService executionContextService; + + @Mock + private ExecutionService executionService; + + @Mock + private FailoverService failoverService; + + @Mock + private JobTracingEventBus jobTracingEventBus; + + @Mock + private ElasticJobListenerCaller caller; + + @Mock + private JobNodeStorage jobNodeStorage; + + @Mock + private InstanceService instanceService; + + private SingleShardingJobFacade singleShardingJobFacade; + + private StringBuilder orderResult; + + @BeforeEach + void setUp() { + orderResult = new StringBuilder(); + singleShardingJobFacade = new SingleShardingJobFacade(null, "test_job", + Arrays.asList(new TestElasticJobListener(caller, "l1", 2, orderResult), new TestElasticJobListener(caller, "l2", 1, orderResult)), null); + ReflectionUtils.setSuperclassFieldValue(singleShardingJobFacade, "configService", configService); + ReflectionUtils.setSuperclassFieldValue(singleShardingJobFacade, "shardingService", shardingService); + ReflectionUtils.setSuperclassFieldValue(singleShardingJobFacade, "executionContextService", executionContextService); + ReflectionUtils.setSuperclassFieldValue(singleShardingJobFacade, "executionService", executionService); + ReflectionUtils.setSuperclassFieldValue(singleShardingJobFacade, "failoverService", failoverService); + ReflectionUtils.setSuperclassFieldValue(singleShardingJobFacade, "jobTracingEventBus", jobTracingEventBus); + ReflectionUtils.setFieldValue(singleShardingJobFacade, "jobNodeStorage", jobNodeStorage); + ReflectionUtils.setFieldValue(singleShardingJobFacade, "instanceService", instanceService); + } + + @Test + void assertLoad() { + JobConfiguration expected = JobConfiguration.newBuilder("test_job", 1).cron("0/1 * * * * ?").build(); + when(configService.load(true)).thenReturn(expected); + assertThat(singleShardingJobFacade.loadJobConfiguration(true), is(expected)); + } + + @Test + void assertCheckMaxTimeDiffSecondsTolerable() throws JobExecutionEnvironmentException { + singleShardingJobFacade.checkJobExecutionEnvironment(); + verify(configService).checkMaxTimeDiffSecondsTolerable(); + } + + @Test + void assertFailoverIfUnnecessary() { + when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 1).cron("0/1 * * * * ?").failover(false).build()); + singleShardingJobFacade.failoverIfNecessary(); + verify(failoverService, times(0)).failoverIfNecessary(); + } + + @Test + void assertFailoverIfNecessary() { + when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 1).cron("0/1 * * * * ?").failover(true).monitorExecution(true).build()); + singleShardingJobFacade.failoverIfNecessary(); + verify(failoverService).failoverIfNecessary(); + } + + @Test + void assertRegisterJobBegin() { + ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap()); + singleShardingJobFacade.registerJobBegin(shardingContexts); + verify(executionService).registerJobBegin(shardingContexts); + } + + @Test + void assertRegisterJobCompletedWhenFailoverDisabled() { + ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap()); + when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 1).cron("0/1 * * * * ?").failover(false).build()); + singleShardingJobFacade.registerJobCompleted(shardingContexts); + verify(executionService).registerJobCompleted(shardingContexts); + verify(failoverService, times(0)).updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet()); + } + + @Test + void assertRegisterJobCompletedWhenFailoverEnabled() { + ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap()); + when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 1).cron("0/1 * * * * ?").failover(true).monitorExecution(true).build()); + singleShardingJobFacade.registerJobCompleted(shardingContexts); + verify(executionService).registerJobCompleted(shardingContexts); + verify(failoverService).updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet()); + } + + @Test + void assertRegisterJobCompletedWhenRunningOnCurrentHost() { + ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap()); + when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 1).cron("0/1 * * * * ?").failover(true).monitorExecution(true).build()); + JobInstance jobInstance = new JobInstance(); + jobInstance.setServerIp("192.168.1.2"); + JobRegistry jobRegistry = JobRegistry.getInstance(); + jobRegistry.addJobInstance("test_job", jobInstance); + List availJobInst = new ArrayList<>(); + availJobInst.add(jobInstance); + JobInstance jobInstance2 = new JobInstance(); + jobInstance2.setServerIp("192.168.1.3"); + availJobInst.add(jobInstance2); + when(instanceService.getAvailableJobInstances()).thenReturn(availJobInst); + + singleShardingJobFacade.registerJobCompleted(shardingContexts); + + verify(executionService).registerJobCompleted(shardingContexts); + verify(failoverService).updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet()); + verify(jobNodeStorage).fillEphemeralJobNode("next-job-instance-ip", availJobInst.get(1).getServerIp()); + } + + @Test + void assertRegisterJobCompletedWhenRunningOnOtherHost() { + ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap()); + when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 1).cron("0/1 * * * * ?").failover(true).monitorExecution(true).build()); + JobInstance jobInstance = new JobInstance(); + jobInstance.setServerIp("192.168.1.2"); + JobRegistry jobRegistry = JobRegistry.getInstance(); + jobRegistry.addJobInstance("test_job", jobInstance); + List availJobInst = new ArrayList<>(); + JobInstance jobInstance2 = new JobInstance(); + jobInstance2.setServerIp("192.168.1.3"); + availJobInst.add(jobInstance2); + when(instanceService.getAvailableJobInstances()).thenReturn(availJobInst); + + singleShardingJobFacade.registerJobCompleted(shardingContexts); + + verify(executionService).registerJobCompleted(shardingContexts); + verify(failoverService).updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet()); + verify(jobNodeStorage, times(0)).fillEphemeralJobNode("next-job-instance-ip", availJobInst.get(0).getServerIp()); + } + + @Test + void assertGetShardingContextWhenIsFailoverEnableAndFailover() { + ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap()); + when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 1).cron("0/1 * * * * ?").failover(true).monitorExecution(true).build()); + when(failoverService.getLocalFailoverItems()).thenReturn(Collections.singletonList(0)); + when(executionContextService.getJobShardingContext(Collections.singletonList(0))).thenReturn(shardingContexts); + assertThat(singleShardingJobFacade.getShardingContexts(), is(shardingContexts)); + verify(shardingService, times(0)).shardingIfNecessary(); + } + + @Test + void assertGetShardingContextWhenIsFailoverEnableAndNotFailover() { + ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap()); + when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 1).cron("0/1 * * * * ?").failover(true).monitorExecution(true).build()); + when(failoverService.getLocalFailoverItems()).thenReturn(Collections.emptyList()); + when(shardingService.getLocalShardingItems()).thenReturn(Lists.newArrayList(0)); + when(failoverService.getLocalTakeOffItems()).thenReturn(Collections.emptyList()); + when(executionContextService.getJobShardingContext(Collections.singletonList(0))).thenReturn(shardingContexts); + assertThat(singleShardingJobFacade.getShardingContexts(), is(shardingContexts)); + verify(shardingService).shardingIfNecessary(); + } + + @Test + void assertGetShardingContextWhenIsFailoverDisable() { + ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap()); + when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 1).cron("0/1 * * * * ?").failover(false).build()); + when(shardingService.getLocalShardingItems()).thenReturn(Collections.singletonList(0)); + when(executionContextService.getJobShardingContext(Collections.singletonList(0))).thenReturn(shardingContexts); + assertThat(singleShardingJobFacade.getShardingContexts(), is(shardingContexts)); + verify(shardingService).shardingIfNecessary(); + } + + @Test + void assertGetShardingContextWhenHasDisabledItems() { + ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap()); + when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 1).cron("0/1 * * * * ?").failover(false).build()); + when(shardingService.getLocalShardingItems()).thenReturn(Lists.newArrayList(0)); + when(executionService.getDisabledItems(Collections.singletonList(0))).thenReturn(Collections.singletonList(0)); + when(executionContextService.getJobShardingContext(Collections.emptyList())).thenReturn(shardingContexts); + assertThat(singleShardingJobFacade.getShardingContexts(), is(shardingContexts)); + verify(shardingService).shardingIfNecessary(); + } + + @Test + void assertGetShardingContextWhenIsFailoverDisableAndNoNeedShardingWithoutNextIP() { + ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap()); + when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 1).cron("0/1 * * * * ?").failover(false).build()); + when(shardingService.isNeedSharding()).thenReturn(false); + when(jobNodeStorage.getJobNodeDataDirectly("next-job-instance-ip")).thenReturn(null); + when(shardingService.getLocalShardingItems()).thenReturn(Collections.singletonList(0)); + when(executionContextService.getJobShardingContext(Collections.singletonList(0))).thenReturn(shardingContexts); + + assertThat(singleShardingJobFacade.getShardingContexts(), is(shardingContexts)); + + verify(shardingService, times(1)).shardingIfNecessary(); + } + + @Test + void assertGetShardingContextWhenIsFailoverDisableAndNoNeedShardingWithNextIP() { + ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap()); + when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 1).cron("0/1 * * * * ?").failover(false).build()); + when(shardingService.isNeedSharding()).thenReturn(false); + when(jobNodeStorage.getJobNodeDataDirectly("next-job-instance-ip")).thenReturn("192.168.1.2"); + JobInstance jobInstance = new JobInstance(); + jobInstance.setServerIp("192.168.1.2"); + JobRegistry jobRegistry = JobRegistry.getInstance(); + jobRegistry.addJobInstance("test_job", jobInstance); + when(executionContextService.getJobShardingContext(Collections.singletonList(0))).thenReturn(shardingContexts); + + assertThat(singleShardingJobFacade.getShardingContexts(), is(shardingContexts)); + + verify(shardingService, times(0)).shardingIfNecessary(); + } + + @Test + void assertGetShardingContextWhenIsFailoverDisableAndNeedSharding() { + ShardingContexts shardingContexts = new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap()); + when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job", 1).cron("0/1 * * * * ?").failover(false).build()); + when(shardingService.getLocalShardingItems()).thenReturn(Collections.singletonList(0)); + when(shardingService.isNeedSharding()).thenReturn(true); + when(executionContextService.getJobShardingContext(Collections.singletonList(0))).thenReturn(shardingContexts); + + assertThat(singleShardingJobFacade.getShardingContexts(), is(shardingContexts)); + + verify(shardingService).shardingIfNecessary(); + } + + @Test + void assertMisfireIfRunning() { + when(executionService.misfireIfHasRunningItems(Arrays.asList(0, 1))).thenReturn(true); + assertThat(singleShardingJobFacade.misfireIfRunning(Arrays.asList(0, 1)), is(true)); + } + + @Test + void assertClearMisfire() { + singleShardingJobFacade.clearMisfire(Arrays.asList(0, 1)); + verify(executionService).clearMisfire(Arrays.asList(0, 1)); + } + + @Test + void assertIsNeedSharding() { + when(shardingService.isNeedSharding()).thenReturn(true); + assertThat(singleShardingJobFacade.isNeedSharding(), is(true)); + } + + @Test + void assertBeforeJobExecuted() { + singleShardingJobFacade.beforeJobExecuted(new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap())); + verify(caller, times(2)).before(); + assertThat(orderResult.toString(), is("l2l1")); + } + + @Test + void assertAfterJobExecuted() { + singleShardingJobFacade.afterJobExecuted(new ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap())); + verify(caller, times(2)).after(); + assertThat(orderResult.toString(), is("l2l1")); + } + + @Test + void assertPostJobExecutionEvent() { + singleShardingJobFacade.postJobExecutionEvent(null); + verify(jobTracingEventBus).post(null); + } +} diff --git a/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/internal/sharding/strategy/type/SingleShardingBalanceJobShardingStrategyTest.java b/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/internal/sharding/strategy/type/SingleShardingBalanceJobShardingStrategyTest.java new file mode 100644 index 0000000000..d04d43820c --- /dev/null +++ b/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/internal/sharding/strategy/type/SingleShardingBalanceJobShardingStrategyTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.elasticjob.kernel.internal.sharding.strategy.type; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.shardingsphere.elasticjob.kernel.internal.sharding.JobInstance; +import org.junit.jupiter.api.Test; + +class SingleShardingBalanceJobShardingStrategyTest { + + private final SingleShardingBalanceJobShardingStrategy singleShardingBalanceJobShardingStrategy = new SingleShardingBalanceJobShardingStrategy(); + + @Test + void assertSharding() { + Map> sharding = singleShardingBalanceJobShardingStrategy.sharding( + Arrays.asList(new JobInstance("host0@-@0"), new JobInstance("host1@-@0"), new JobInstance("host2@-@0")), + "JobName", 1); + int sum = sharding.values().stream().mapToInt(List::size).sum(); + assertThat(sum, is(1)); + } +}