Skip to content

Commit

Permalink
feat(core): Added distributor 3 scheduler (partial)
Browse files Browse the repository at this point in the history
  • Loading branch information
phinner committed Mar 21, 2024
1 parent cf158fc commit 4b3f977
Show file tree
Hide file tree
Showing 11 changed files with 681 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
public final class DistributorCorePlugin extends AbstractMindustryPlugin implements Distributor {

private final ServiceManager services = ServiceManager.simple();
private final MultiLocalizationSource source = MultiLocalizationSource.create();
private CommandFacade.@Nullable Factory factory = null;
private @Nullable PermissionManager permissions = null;
private final MultiLocalizationSource source = MultiLocalizationSource.create();

@Override
public ServiceManager getServiceManager() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Distributor, a feature-rich framework for Mindustry plugins.
*
* Copyright (C) 2024 Xpdustry
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.xpdustry.distributor.core.annotation;

import com.xpdustry.distributor.core.scheduler.Cancellable;
import com.xpdustry.distributor.core.scheduler.MindustryTimeUnit;
import com.xpdustry.distributor.core.scheduler.PluginScheduler;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Marks a method as a task handler, meaning it will be registered and called as a scheduled task in the
* {@link PluginScheduler}.
* <br>
* The annotated method can have one {@link Cancellable} parameter to allow the task to cancel itself.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface TaskHandler {

/**
* The interval between each execution of the task.
* The task will be executed once if the interval is set to a value below -1.
*/
long interval() default -1;

/**
* The initial delay before the first execution of the task.
* The task will be executed immediately if the delay is set to a value below -1.
*/
long delay() default -1;

/**
* The time unit of the interval and initial delay.
*/
MindustryTimeUnit unit() default MindustryTimeUnit.SECONDS;

/**
* Whether the task should be executed asynchronously.
*/
boolean async() default false;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Distributor, a feature-rich framework for Mindustry plugins.
*
* Copyright (C) 2024 Xpdustry
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.xpdustry.distributor.core.scheduler;

/**
* A {@code Cancellable} is used to cancel a task.
*/
public interface Cancellable {

/**
* Cancels the task bound to this {@code Cancellable}.
*/
void cancel();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Distributor, a feature-rich framework for Mindustry plugins.
*
* Copyright (C) 2024 Xpdustry
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.xpdustry.distributor.core.scheduler;

import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.jspecify.annotations.Nullable;

/**
* Time units used by the {@link PluginTaskBuilder} and {@link PluginTask} classes to represent time.
*/
public enum MindustryTimeUnit {

/**
* Time unit representing one thousandth of a second.
*/
MILLISECONDS(TimeUnit.MILLISECONDS),

/**
* Time unit representing one game loop, which is 60 times per second.
*/
TICKS(null),

/**
* Time unit representing one thousandth of a millisecond.
*/
SECONDS(TimeUnit.SECONDS),

/**
* Time unit representing sixty seconds.
*/
MINUTES(TimeUnit.MINUTES),

/**
* Time unit representing sixty minutes.
*/
HOURS(TimeUnit.HOURS),

/**
* Time unit representing twenty-four hours.
*/
DAYS(TimeUnit.DAYS);

private final @Nullable TimeUnit unit;

MindustryTimeUnit(final @Nullable TimeUnit unit) {
this.unit = unit;
}

/**
* Converts the given duration in the given time unit to this time unit.
* <p>
* Since this method is equivalent to {@link TimeUnit#convert(long, TimeUnit)}:
* <ul>
* <li>If it overflows, the result will be {@link Long#MAX_VALUE} if the duration is positive,
* or {@link Long#MIN_VALUE} if it is negative.</li>
* <li>Conversions are floored so converting 999 milliseconds to seconds results in 0.</li>
* </ul>
*
* @param sourceDuration the duration to convert
* @param sourceUnit the time unit of the duration
* @return the converted duration
* @see TimeUnit#convert(long, TimeUnit)
*/
public long convert(final long sourceDuration, final MindustryTimeUnit sourceUnit) {
if (this == sourceUnit) {
return sourceDuration;
}
final var sourceJavaUnit = sourceUnit.getJavaTimeUnit();
final var targetJavaUnit = this.getJavaTimeUnit();

if (sourceJavaUnit.isPresent() && targetJavaUnit.isPresent()) {
return targetJavaUnit.get().convert(sourceDuration, sourceJavaUnit.get());
} else if (sourceJavaUnit.isEmpty()) {
return targetJavaUnit
.orElseThrow()
.convert((long) Math.nextUp(sourceDuration * (1000F / 60F)), TimeUnit.MILLISECONDS);
} else {
final var millis = TimeUnit.MILLISECONDS.convert(sourceDuration, sourceJavaUnit.orElseThrow());
if (millis == Long.MAX_VALUE || millis == Long.MIN_VALUE) {
return millis;
}
return (long) (millis * (60F / 1000L));
}
}

/**
* Returns the Java time unit associated with this Mindustry time unit, if any.
*/
public Optional<TimeUnit> getJavaTimeUnit() {
return Optional.ofNullable(this.unit);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Distributor, a feature-rich framework for Mindustry plugins.
*
* Copyright (C) 2024 Xpdustry
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.xpdustry.distributor.core.scheduler;

import com.xpdustry.distributor.core.plugin.MindustryPlugin;

/**
* A {@code PluginScheduler} is used to schedule tasks for a plugin. A better alternative to {@link arc.util.Timer}.
*/
public interface PluginScheduler {

/**
* Returns a new {@link PluginTaskBuilder} instance scheduling a task.
*
* @param plugin the plugin to schedule the task for.
* @return a new {@link PluginTaskBuilder} instance.
*/
PluginTaskBuilder schedule(final MindustryPlugin plugin);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Distributor, a feature-rich framework for Mindustry plugins.
*
* Copyright (C) 2024 Xpdustry
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.xpdustry.distributor.core.scheduler;

import com.xpdustry.distributor.core.plugin.MindustryPlugin;
import com.xpdustry.distributor.core.plugin.PluginListener;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class PluginSchedulerImpl implements PluginScheduler, PluginListener {

static final String DISTRIBUTOR_WORKER_BASE_NAME = "distributor-worker-";
private static final Logger logger = LoggerFactory.getLogger("PluginScheduler");

private final Queue<PluginTaskImpl<?>> tasks =
new PriorityBlockingQueue<>(16, Comparator.comparing(PluginTaskImpl::getNextExecutionTime));
private final ForkJoinPool pool;
private final Executor syncExecutor;
private final TimeSource source;

public PluginSchedulerImpl(final TimeSource source, final Executor syncExecutor, final int parallelism) {
this.pool = new ForkJoinPool(parallelism, new PluginSchedulerWorkerThreadFactory(), null, false);
this.syncExecutor = syncExecutor;
this.source = source;
}

@Override
public PluginTaskBuilder schedule(final MindustryPlugin plugin) {
return new PluginTaskImpl.Builder(this, plugin);
}

@Override
public void onPluginUpdate() {
while (!this.tasks.isEmpty()) {
final var task = this.tasks.peek();
if (task.isCancelled()) {
this.tasks.remove();
} else if (task.getNextExecutionTime() < this.source.getCurrentTicks()) {
this.tasks.remove();
final Executor executor = task.isAsync() ? this.pool : this.syncExecutor;
executor.execute(task);
} else {
break;
}
}
}

@Override
public void onPluginExit() {
logger.info("Shutdown scheduler.");
this.pool.shutdown();
try {
if (!this.pool.awaitTermination(20, TimeUnit.SECONDS)) {
logger.error("Timed out waiting for the scheduler to terminate properly");
Thread.getAllStackTraces().forEach((thread, stack) -> {
if (thread.getName().startsWith(DISTRIBUTOR_WORKER_BASE_NAME)) {
logger.error(
"Worker thread {} may be blocked, possibly the reason for the slow shutdown:\n{}",
thread.getName(),
Arrays.stream(stack).map(e -> " " + e).collect(Collectors.joining("\n")));
}
});
}
} catch (final InterruptedException e) {
logger.error("The plugin scheduler shutdown have been interrupted.", e);
}
}

void schedule(final PluginTaskImpl<?> task) {
this.tasks.add(task);
}

TimeSource getTimeSource() {
return this.source;
}

boolean isShutdown() {
return this.pool.isShutdown();
}

private static final class PluginSchedulerWorkerThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory {

private static final AtomicInteger COUNT = new AtomicInteger(0);

@Override
public ForkJoinWorkerThread newThread(final ForkJoinPool pool) {
final var thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
thread.setName(DISTRIBUTOR_WORKER_BASE_NAME + COUNT.getAndIncrement());
return thread;
}
}
}
Loading

0 comments on commit 4b3f977

Please sign in to comment.