From bd82a09ee7f11a80a4b33cd055cd91a3f158347f Mon Sep 17 00:00:00 2001 From: Trevor Bekolay Date: Mon, 3 Aug 2020 21:38:44 -0500 Subject: [PATCH] Use multiprocess if available --- CHANGES | 1 + doc/cmd_run.rst | 2 ++ doit/cmd_auto.py | 2 +- doit/compat.py | 25 +++++++++++++++++++++++++ doit/runner.py | 13 ++----------- tests/test_cmd_auto.py | 2 +- tests/test_runner.py | 28 ++++++++++++++-------------- 7 files changed, 46 insertions(+), 27 deletions(-) diff --git a/CHANGES b/CHANGES index f954eef7..b3889596 100644 --- a/CHANGES +++ b/CHANGES @@ -15,6 +15,7 @@ Changes - Fix #373: read DOIT_CONFIG from TOML. - Fix #405: Add Task attribute `meta`. - Fix #349: Handle passing task args in "single" task execution. +- Fix #369: Support `multiprocess` if manually installed. 0.33.1 (*2020-09-04*) ===================== diff --git a/doc/cmd_run.rst b/doc/cmd_run.rst index 5d8be54a..a4d0d374 100644 --- a/doc/cmd_run.rst +++ b/doc/cmd_run.rst @@ -240,6 +240,8 @@ parallel execution This allows different tasks to be run in parallel, as long any dependencies are met. By default the `multiprocessing `_ module is used. +If the `multiprocess `_ module is installed, +it will be used instead. So the same restrictions also apply to the use of multiprocessing in `doit`. .. code-block:: console diff --git a/doit/cmd_auto.py b/doit/cmd_auto.py index 62f07613..64abefff 100644 --- a/doit/cmd_auto.py +++ b/doit/cmd_auto.py @@ -4,9 +4,9 @@ import os import time import sys -from multiprocessing import Process from subprocess import call +from .compat import Process from .exceptions import InvalidCommand from .cmdparse import CmdParse from .filewatch import FileModifyWatcher diff --git a/doit/compat.py b/doit/compat.py index a935e15e..808c542c 100644 --- a/doit/compat.py +++ b/doit/compat.py @@ -1,5 +1,30 @@ """stuff dealing with incompatibilities between python versions""" +try: + from multiprocess import Process, Queue as MQueue + HAS_MULTIPROCESS = True +except ImportError: + from multiprocessing import Process, Queue as MQueue + HAS_MULTIPROCESS = False +Process # pyflakes +MQueue # pyflakes + + +def is_multiprocessing_available(): + # see: http://bugs.python.org/issue3770 + # not available on BSD systens + try: + if HAS_MULTIPROCESS: + import multiprocess.synchronize + multiprocess + else: + import multiprocessing.synchronize + multiprocessing + except ImportError: # pragma: no cover + return False + else: + return True + def get_platform_system(): """return platform.system diff --git a/doit/runner.py b/doit/runner.py index 340c1614..e7de2036 100644 --- a/doit/runner.py +++ b/doit/runner.py @@ -1,12 +1,12 @@ """Task runner""" -from multiprocessing import Process, Queue as MQueue from threading import Thread import pickle import queue import cloudpickle +from .compat import is_multiprocessing_available, MQueue, Process from .exceptions import InvalidTask, CatchedException from .exceptions import TaskFailed, SetupError, DependencyError, UnmetDependency from .task import Stream, DelayedLoaded @@ -327,16 +327,7 @@ class MRunner(Runner): @staticmethod def available(): """check if multiprocessing module is available""" - # see: https://bitbucket.org/schettino72/doit/issue/17 - # http://bugs.python.org/issue3770 - # not available on BSD systens - try: - import multiprocessing.synchronize - multiprocessing # pyflakes - except ImportError: # pragma: no cover - return False - else: - return True + return is_multiprocessing_available() def __init__(self, dep_manager, reporter, continue_=False, always_execute=False, diff --git a/tests/test_cmd_auto.py b/tests/test_cmd_auto.py index 9bc317aa..61012479 100644 --- a/tests/test_cmd_auto.py +++ b/tests/test_cmd_auto.py @@ -1,8 +1,8 @@ import time -from multiprocessing import Process import pytest +from doit.compat import Process from doit.cmdparse import DefaultUpdate from doit.task import Task from doit.cmd_base import TaskLoader diff --git a/tests/test_runner.py b/tests/test_runner.py index af34ffa3..f463b7f5 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -1,12 +1,12 @@ import os import sys import pickle -from multiprocessing import Queue import platform from unittest.mock import Mock import pytest +from doit.compat import MQueue from doit.exceptions import CatchedException, InvalidTask from doit.dependency import DbmDB, Dependency from doit.reporter import ConsoleReporter @@ -557,7 +557,7 @@ def testSystemExitRaises(self, reporter, RunnerClass, dep_manager): class TestMReporter(object): class MyRunner(object): def __init__(self): - self.result_q = Queue() + self.result_q = MQueue() def testReporterMethod(self, reporter): fake_runner = self.MyRunner() @@ -696,8 +696,8 @@ def test_all_processes(self, reporter, monkeypatch, dep_manager): td = TaskDispatcher({'t1':t1, 't2':t2}, [], ['t1', 't2']) run = runner.MRunner(dep_manager, reporter, num_process=2) run._run_tasks_init(td) - result_q = Queue() - task_q = Queue() + result_q = MQueue() + task_q = MQueue() proc_list = run._run_start_processes(task_q, result_q) run.finish() @@ -714,8 +714,8 @@ def test_less_processes(self, reporter, monkeypatch, dep_manager): td = TaskDispatcher({'t1':t1}, [], ['t1']) run = runner.MRunner(dep_manager, reporter, num_process=2) run._run_tasks_init(td) - result_q = Queue() - task_q = Queue() + result_q = MQueue() + task_q = MQueue() proc_list = run._run_start_processes(task_q, result_q) run.finish() @@ -732,8 +732,8 @@ def test_waiting_process(self, reporter, monkeypatch, dep_manager): td = TaskDispatcher({'t1':t1, 't2':t2}, [], ['t1', 't2']) run = runner.MRunner(dep_manager, reporter, num_process=2) run._run_tasks_init(td) - result_q = Queue() - task_q = Queue() + result_q = MQueue() + task_q = MQueue() proc_list = run._run_start_processes(task_q, result_q) run.finish() @@ -779,10 +779,10 @@ def test_task_not_picklabe_thread(self, reporter, dep_manager): class TestMRunner_execute_task(object): def test_hold(self, reporter, dep_manager): run = runner.MRunner(dep_manager, reporter) - task_q = Queue() + task_q = MQueue() task_q.put(runner.JobHold()) # to test task_q.put(None) # to terminate function - result_q = Queue() + result_q = MQueue() run.execute_task_subprocess(task_q, result_q, reporter.__class__) run.finish() # nothing was done @@ -792,10 +792,10 @@ def test_full_task(self, reporter, dep_manager): # test execute_task_subprocess can receive a full Task object run = runner.MRunner(dep_manager, reporter) t1 = Task('t1', [simple_result]) - task_q = Queue() + task_q = MQueue() task_q.put(runner.JobTask(t1)) # to test task_q.put(None) # to terminate function - result_q = Queue() + result_q = MQueue() run.execute_task_subprocess(task_q, result_q, reporter.__class__) run.finish() # check result @@ -812,10 +812,10 @@ def test_full_task_fail(self, reporter, dep_manager): # test execute_task_subprocess can receive a full Task object run = runner.MRunner(dep_manager, reporter) t1 = Task('t1', [simple_fail]) - task_q = Queue() + task_q = MQueue() task_q.put(runner.JobTask(t1)) # to test task_q.put(None) # to terminate function - result_q = Queue() + result_q = MQueue() run.execute_task_subprocess(task_q, result_q, reporter.__class__) run.finish() # check result