Skip to content

Commit

Permalink
Use multiprocess if available
Browse files Browse the repository at this point in the history
  • Loading branch information
tbekolay committed Nov 16, 2021
1 parent 90f19fa commit bd82a09
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Changes
- Fix #373: read DOIT_CONFIG from TOML.
- Fix #405: Add Task attribute `meta`.
- Fix #349: Handle passing task args in "single" task execution.
- Fix #369: Support `multiprocess` if manually installed.

0.33.1 (*2020-09-04*)
=====================
Expand Down
2 changes: 2 additions & 0 deletions doc/cmd_run.rst
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ parallel execution
This allows different tasks to be run in parallel, as long any dependencies are met.
By default the `multiprocessing <http://docs.python.org/library/multiprocessing.html>`_
module is used.
If the `multiprocess <https://pypi.org/project/multiprocess/>`_ module is installed,
it will be used instead.
So the same restrictions also apply to the use of multiprocessing in `doit`.

.. code-block:: console
Expand Down
2 changes: 1 addition & 1 deletion doit/cmd_auto.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import os
import time
import sys
from multiprocessing import Process
from subprocess import call

from .compat import Process
from .exceptions import InvalidCommand
from .cmdparse import CmdParse
from .filewatch import FileModifyWatcher
Expand Down
25 changes: 25 additions & 0 deletions doit/compat.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,30 @@
"""stuff dealing with incompatibilities between python versions"""

try:
from multiprocess import Process, Queue as MQueue
HAS_MULTIPROCESS = True
except ImportError:
from multiprocessing import Process, Queue as MQueue
HAS_MULTIPROCESS = False
Process # pyflakes
MQueue # pyflakes


def is_multiprocessing_available():
# see: http://bugs.python.org/issue3770
# not available on BSD systens
try:
if HAS_MULTIPROCESS:
import multiprocess.synchronize
multiprocess
else:
import multiprocessing.synchronize
multiprocessing
except ImportError: # pragma: no cover
return False
else:
return True


def get_platform_system():
"""return platform.system
Expand Down
13 changes: 2 additions & 11 deletions doit/runner.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
"""Task runner"""

from multiprocessing import Process, Queue as MQueue
from threading import Thread
import pickle
import queue

import cloudpickle

from .compat import is_multiprocessing_available, MQueue, Process
from .exceptions import InvalidTask, CatchedException
from .exceptions import TaskFailed, SetupError, DependencyError, UnmetDependency
from .task import Stream, DelayedLoaded
Expand Down Expand Up @@ -327,16 +327,7 @@ class MRunner(Runner):
@staticmethod
def available():
"""check if multiprocessing module is available"""
# see: https://bitbucket.org/schettino72/doit/issue/17
# http://bugs.python.org/issue3770
# not available on BSD systens
try:
import multiprocessing.synchronize
multiprocessing # pyflakes
except ImportError: # pragma: no cover
return False
else:
return True
return is_multiprocessing_available()

def __init__(self, dep_manager, reporter,
continue_=False, always_execute=False,
Expand Down
2 changes: 1 addition & 1 deletion tests/test_cmd_auto.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import time
from multiprocessing import Process

import pytest

from doit.compat import Process
from doit.cmdparse import DefaultUpdate
from doit.task import Task
from doit.cmd_base import TaskLoader
Expand Down
28 changes: 14 additions & 14 deletions tests/test_runner.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import os
import sys
import pickle
from multiprocessing import Queue
import platform
from unittest.mock import Mock

import pytest

from doit.compat import MQueue
from doit.exceptions import CatchedException, InvalidTask
from doit.dependency import DbmDB, Dependency
from doit.reporter import ConsoleReporter
Expand Down Expand Up @@ -557,7 +557,7 @@ def testSystemExitRaises(self, reporter, RunnerClass, dep_manager):
class TestMReporter(object):
class MyRunner(object):
def __init__(self):
self.result_q = Queue()
self.result_q = MQueue()

def testReporterMethod(self, reporter):
fake_runner = self.MyRunner()
Expand Down Expand Up @@ -696,8 +696,8 @@ def test_all_processes(self, reporter, monkeypatch, dep_manager):
td = TaskDispatcher({'t1':t1, 't2':t2}, [], ['t1', 't2'])
run = runner.MRunner(dep_manager, reporter, num_process=2)
run._run_tasks_init(td)
result_q = Queue()
task_q = Queue()
result_q = MQueue()
task_q = MQueue()

proc_list = run._run_start_processes(task_q, result_q)
run.finish()
Expand All @@ -714,8 +714,8 @@ def test_less_processes(self, reporter, monkeypatch, dep_manager):
td = TaskDispatcher({'t1':t1}, [], ['t1'])
run = runner.MRunner(dep_manager, reporter, num_process=2)
run._run_tasks_init(td)
result_q = Queue()
task_q = Queue()
result_q = MQueue()
task_q = MQueue()

proc_list = run._run_start_processes(task_q, result_q)
run.finish()
Expand All @@ -732,8 +732,8 @@ def test_waiting_process(self, reporter, monkeypatch, dep_manager):
td = TaskDispatcher({'t1':t1, 't2':t2}, [], ['t1', 't2'])
run = runner.MRunner(dep_manager, reporter, num_process=2)
run._run_tasks_init(td)
result_q = Queue()
task_q = Queue()
result_q = MQueue()
task_q = MQueue()

proc_list = run._run_start_processes(task_q, result_q)
run.finish()
Expand Down Expand Up @@ -779,10 +779,10 @@ def test_task_not_picklabe_thread(self, reporter, dep_manager):
class TestMRunner_execute_task(object):
def test_hold(self, reporter, dep_manager):
run = runner.MRunner(dep_manager, reporter)
task_q = Queue()
task_q = MQueue()
task_q.put(runner.JobHold()) # to test
task_q.put(None) # to terminate function
result_q = Queue()
result_q = MQueue()
run.execute_task_subprocess(task_q, result_q, reporter.__class__)
run.finish()
# nothing was done
Expand All @@ -792,10 +792,10 @@ def test_full_task(self, reporter, dep_manager):
# test execute_task_subprocess can receive a full Task object
run = runner.MRunner(dep_manager, reporter)
t1 = Task('t1', [simple_result])
task_q = Queue()
task_q = MQueue()
task_q.put(runner.JobTask(t1)) # to test
task_q.put(None) # to terminate function
result_q = Queue()
result_q = MQueue()
run.execute_task_subprocess(task_q, result_q, reporter.__class__)
run.finish()
# check result
Expand All @@ -812,10 +812,10 @@ def test_full_task_fail(self, reporter, dep_manager):
# test execute_task_subprocess can receive a full Task object
run = runner.MRunner(dep_manager, reporter)
t1 = Task('t1', [simple_fail])
task_q = Queue()
task_q = MQueue()
task_q.put(runner.JobTask(t1)) # to test
task_q.put(None) # to terminate function
result_q = Queue()
result_q = MQueue()
run.execute_task_subprocess(task_q, result_q, reporter.__class__)
run.finish()
# check result
Expand Down

0 comments on commit bd82a09

Please sign in to comment.