Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#2 limited executor #15

Closed
wants to merge 21 commits into from
Closed

#2 limited executor #15

wants to merge 21 commits into from

Conversation

Adeynack
Copy link
Owner

@Adeynack Adeynack commented Jan 18, 2018

closes #2

Please, let me merge this PR when it is ready.

@Adeynack
Copy link
Owner Author

@meersdavy @semonte @nebulorum If you guys could have a quick look so I can close that one, that would be very appreciated! 😄

taskQueue.add(command);
} else {
final int actualTasksSubmitted = tasksSubmitted;
tasksSubmitted = actualTasksSubmitted + 1;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not tasksSubmitted++ instead of 2-line dance

Suggested change
tasksSubmitted = actualTasksSubmitted + 1;
tasksSubmitted++;

* indicates the number of tasks already submitted to the underlying {@link Executor}.
*/
@GuardedBy("mainLock")
private volatile int tasksSubmitted = 0;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed all access to this variable is guarded by lock so there is no need to mark in volatile

Copy link

@mkulak mkulak Nov 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also name is a bit confusing. One can interpret it as "amount of tasks submitted to THIS executor, not inner one".
I would suggest name like runningTasksCount (as those tasks are actually running while others are just sitting in the queue)

} finally {
mainLock.unlock();
}
return actualTasksSubmitted;
Copy link

@mkulak mkulak Nov 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also using synchronized will result in much more concise code:

public synchronized int getTasksSubmitted() {
    return tasksSubmitted;
}

Even though it is kinda old-fashioned and not really encouraged these days.
UPD:
Just to clarify what I mean: if we get rid of mainLock and just use synchronized everywhere then we end up with less code.

* @return a copy of the current queue of tasks.
*/
public Queue<Runnable> getTaskQueue() {
return new ArrayDeque<>(this.taskQueue);
Copy link

@mkulak mkulak Nov 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ACHTUNG SCHWIERIGEN BUGGEN!
Access to taskQueue should be guarded by same lock.
If linter doesn't complain about this one then I suggest to turn off this linter )

return new ArrayDeque<>(this.taskQueue);
}

}
Copy link

@mkulak mkulak Nov 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this approach will ofc work (if implemented correctly).
Minor suggestion would be to move logic for incrementing tasksSubmitted and submitting tasks to one method "tryExecute" or something and than change logic of execute to put task in a queue anyway and then call "tryExecute". Same thing after each task is finished - just call "tryExecute"

Major suggestion would be to try to come up with non-blocking implementation (just using non-blocking queue) to reduce thread contention and maximize performance (parking thread while waiting for a lock is still bad if you use non-blocking IO and stuff)

Copy link

@mkulak mkulak Nov 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I sketched possible non-blocking solution in Kotlin (haven't test it, it's just a raw idea):

import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.Executor
import java.util.concurrent.atomic.AtomicInteger

class LimitedExecutor(private val baseExecutor: Executor, private val parallelTaskLimit: Int) : Executor {
    private val taskQueue = ConcurrentLinkedDeque<Runnable>()
    private val tasksRunning = AtomicInteger()

    override fun execute(command: Runnable) {
        taskQueue.add(command)
        trySubmit()
    }

    private fun trySubmit() {
        val running = tasksRunning.get()
        if (running < parallelTaskLimit) {
            val task = taskQueue.poll()
            val res = tasksRunning.compareAndSet(running, running + 1)
            if (res) {
                baseExecutor.execute(createTaskWrapper(task))
            } else {
                taskQueue.addFirst(task)
            }
        }
    }

    private fun createTaskWrapper(command: Runnable): Runnable = Runnable {
        try {
            command.run()
        } finally {
            tasksRunning.decrementAndGet()
            trySubmit()
        }
    }

    fun getTasksSubmitted(): Int = tasksRunning.get()
}

UPD:
Neh, it won't work :(. Need to think more.

@Adeynack
Copy link
Owner Author

Adeynack commented May 13, 2024

PR timed out. Keeping the branch open however for documenting history.

@Adeynack Adeynack closed this May 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

LimitedExecutor
2 participants