Skip to content

Commit

Permalink
Merge pull request #45 from flennerhag/dev
Browse files Browse the repository at this point in the history
[MRG 0.1.5.2] printed messages + clear_cache
  • Loading branch information
flennerhag authored Jul 27, 2017
2 parents 8aa6eb4 + a138892 commit 87c039c
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 54 deletions.
6 changes: 5 additions & 1 deletion docs/updates.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,9 @@ Change log
- Minor bug fixes
- Refactored backend for streamlined front-end feature development

* 07/2017 Release_ of version 0.1.5.1 and 0.1.5.2
- Bug fixes
- ```clear_cache`` function to check for residual caches. Safeguard against old caches not being killed.

.. _Release: https://github.com/flennerhag/mlens/releases
.. _Feature propagation:
.. _Feature propagation:
15 changes: 8 additions & 7 deletions mlens/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@
"""
# Initialize configurations
import mlens.config
from mlens.config import clear_cache

__version__ = "0.1.5.dev0"

__version__ = "0.1.5.2"

__all__ = ['base',
'ensemble',
'externals',
'utils',
'metrics',
'model_selection',
'parallel',
'ensemble',
'externals',
'visualization',
'preprocessing',
'utils',
'visualization']
'model_selection',
]
37 changes: 37 additions & 0 deletions mlens/base/tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,27 @@
Nosetests for :class:`mlens.base`
"""

import os
import subprocess
import numpy as np

from mlens import config
from mlens.base import (IdTrain,
FoldIndex,
BlendIndex,
SubsetIndex,
ClusteredSubsetIndex,
FullIndex)
from mlens.base.indexer import _partition, _prune_train
try:
from contextlib import redirect_stderr
except ImportError:
from mlens.externals.fixes import redirect as redirect_stderr

X = np.arange(25).reshape(5, 5)

tmpdir = config.TMPDIR


class ClusterEstimator(object):

Expand Down Expand Up @@ -47,6 +56,34 @@ def predict(self, X):
cl_2 = ClusterEstimator(2)


###############################################################################
def test_set_dir():
"""[Base] Test setting temp dir."""
before = config.TMPDIR

config.set_tmpdir(os.getcwd())

after = config.TMPDIR

assert before != after


def test_check_cache():
"""[Base] Test check cache."""
tmp = config.PREFIX + "test"
os.mkdir(tmp)
with open(os.devnull, 'w') as f, redirect_stderr(f):
subprocess.Popen("echo this is a test >> " + tmp +
"/test.txt", shell=True)
config.clear_cache(config.TMPDIR)


def test_reset_dir():
"""[Base] Test resetting temp dir."""
config.set_tmpdir(tmpdir)
assert config.TMPDIR == tmpdir


###############################################################################
def test_id_train():
"""[Base] Test IdTrain class for checking training and test matrices."""
Expand Down
76 changes: 71 additions & 5 deletions mlens/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@
import os
import sys
import numpy
import shutil
import tempfile
import warnings
import sysconfig

import subprocess
from multiprocessing import current_process

###############################################################################
# Variables

DTYPE = getattr(numpy, os.environ.get('MLENS_DTYPE', 'float32'))
TMPDIR = os.environ.get('MLENS_TMPDIR', tempfile.gettempdir())
PREFIX = os.environ.get('MLENS_PREFIX', ".mlens_tmp_cache_")
BACKEND = os.environ.get('MLENS_BACKEND', 'multiprocessing')
START_METHOD = os.environ.get('MLENS_START_METHOD', '')

Expand All @@ -24,6 +28,7 @@
###############################################################################
# Configuration calls


def set_tmpdir(tmp):
"""Set the root directory for temporary caches during estimation.
Expand All @@ -36,6 +41,18 @@ def set_tmpdir(tmp):
TMPDIR = tmp


def set_prefix(prefix):
"""Set the prefix assigned to temporary directories during estimation.
Parameters
----------
prefix : str
cache file name prefix
"""
global PREFIX
PREFIX = prefix


def set_dtype(dtype):
"""Set the dtype to use during estimation.
Expand Down Expand Up @@ -85,16 +102,65 @@ def __get_default_start_method(method):
if new_python:
# Use forkserver for unix and spawn for windows
# Travis currently stalling on OSX, use 'spawn' until investigated
# method = 'forkserver' if not win else 'spawn'
method = 'spawn'
method = 'spawn' if win else 'spawn'
else:
# Use fork (multiprocessing default)
method = 'fork'
return method

###############################################################################
# Handlers


def clear_cache(tmp):
""" Check that cache directory is empty.
Checks that a specified directory do not contain any directories with
the ML-Ensemble temporary cache signature. Attempts to remove any found
directories.
Parameters
----------
tmp : str
the directory to check for residual caches in.
"""
global PREFIX
residuals = [i for i in os.walk(tmp)
if os.path.split(i[0])[-1].startswith(PREFIX)]

n = len(residuals)
if n > 0:
print("[MLENS] Found %i residual cache(s):" % n, file=sys.stderr)

size = 0
for i, res in enumerate(residuals):
s = os.path.getsize(res[0])
size += s

print(" %i (%i): %s" % (i + 1, s, res[0]), file=sys.stderr)

print(" Total size: %i\n[MLENS] Removing..." % size,
end=" ", file=sys.stderr)

try:
for res in residuals:
shutil.rmtree(res[0])
print("done.", file=sys.stderr)

except OSError:
# Can fail on windows, need to use the shell
try:
for res in residuals:
subprocess.Popen('rmdir /S /Q %s' % res[0],
shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
except OSError:
warnings.warn("Failed to delete cache at %s." % res[0])

###############################################################################
# Set up

START_METHOD = __get_default_start_method(START_METHOD)
set_start_method(START_METHOD)
if current_process().name == 'MainProcess':
START_METHOD = __get_default_start_method(START_METHOD)
set_start_method(START_METHOD)
clear_cache(TMPDIR)
103 changes: 72 additions & 31 deletions mlens/ensemble/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,40 @@
from time import time


def print_job(lc, start_message):
"""Print job details.
Parameters
----------
lc : :class:`LayerContainer`
The LayerContainer instance running the job.
start_message : str
Initial message.
"""
pout = "stdout" if lc.verbose >= 50 else "stderr"

if lc.verbose:
safe_print("\n%s %d layers" % (start_message, lc.n_layers),
file=pout, flush=True, end="\n\n")

if lc.verbose >= 10:
safe_print("""\n
Job description
-------------------------
n_jobs = %i
backend = %r
start_method = %r
cache = %r
""" % (lc.n_jobs, lc.backend, config.START_METHOD, config.TMPDIR),
file=pout, flush=True, end="\n\n")

t0 = time()
return pout, t0

###############################################################################


class LayerContainer(BaseEstimator):

r"""Container class for layers.
Expand Down Expand Up @@ -221,11 +255,7 @@ def fit(self, X=None, y=None, return_preds=None, **process_kwargs):
X : array-like, optional
predictions from final layer's ``fit_proba`` call.
"""
if self.verbose:
pout = "stdout" if self.verbose >= 3 else "stderr"
safe_print("Fitting layers (%d)" % self.n_layers,
file=pout, flush=True, end="\n\n")
t0 = time()
pout, t0 = print_job(self, "Fitting")

# Initialize cache
processor = ParallelProcessing(self)
Expand Down Expand Up @@ -266,7 +296,14 @@ def predict(self, X=None, *args, **kwargs):
X_pred : array-like of shape = [n_samples, n_fitted_estimators]
predictions from final layer.
"""
return self._predict(X, 'predict', *args, **kwargs)
pout, t0 = print_job(self, "Predicting with")

out = self._predict(X, 'predict', *args, **kwargs)

if self.verbose:
print_time(t0, "Prediction complete", file=pout, flush=True)

return out

def transform(self, X=None, *args, **kwargs):
"""Generic method for reproducing predictions of the ``fit`` call.
Expand All @@ -287,7 +324,18 @@ def transform(self, X=None, *args, **kwargs):
X_pred : array-like of shape = [n_test_samples, n_fitted_estimators]
predictions from ``fit`` call to final layer.
"""
return self._predict(X, 'transform', *args, **kwargs)
if self.verbose:
pout = "stdout" if self.verbose >= 3 else "stderr"
safe_print("Transforming layers (%d)" % self.n_layers,
file=pout, flush=True, end="\n\n")
t0 = time()

out = self._predict(X, 'transform', *args, **kwargs)

if self.verbose:
print_time(t0, "Transform complete", file=pout, flush=True)

return out

def _predict(self, X, job, *args, **kwargs):
r"""Generic for processing a predict job through all layers.
Expand All @@ -307,25 +355,15 @@ def _predict(self, X, job, *args, **kwargs):
or new predictions on X using base learners fitted on all training
data.
"""
if self.verbose:
pout = "stdout" if self.verbose >= 3 else "stderr"
safe_print("Processing layers (%d)" % self.n_layers,
file=pout, flush=True, end="\n\n")
t0 = time()

# Initialize cache
processor = ParallelProcessing(self)
processor.initialize(job, X, *args, **kwargs)

# Predict with ensemble
try:
processor.process()

preds = processor.get_preds()

if self.verbose:
print_time(t0, "Done", file=pout, flush=True)

finally:
# Always terminate job manager unless user explicitly initialized
processor.terminate()
Expand Down Expand Up @@ -631,6 +669,8 @@ def get_params(self, deep=True):

return out

###############################################################################


class BaseEnsemble(BaseEstimator):

Expand Down Expand Up @@ -695,31 +735,32 @@ def _add(self,
# Check if a Layer Container instance is initialized
if getattr(self, 'layers', None) is None:
self.layers = LayerContainer(
n_jobs=self.n_jobs,
raise_on_exception=self.raise_on_exception,
backend=self.backend,
verbose=self.verbose)
n_jobs=self.n_jobs,
raise_on_exception=self.raise_on_exception,
backend=self.backend,
verbose=self.verbose)

# Add layer to Layer Container
verbose = kwargs.pop('verbose', self.verbose)
scorer = kwargs.pop('scorer', self.scorer)

if 'proba' in kwargs:
if kwargs['proba'] and scorer is not None:
raise ValueError("Cannot score probability-based predictions."
"Set ensemble attribute 'scorer' to "
"None or layer parameter 'Proba' to False.")

self.layers.add(estimators=estimators,
cls=cls,
indexer=indexer,
preprocessing=preprocessing,
scorer=self.scorer,
scorer=scorer,
verbose=verbose,
**kwargs)

# Check parameter comparability
if 'proba' in kwargs:
scorer = getattr(self, 'scorer', None)
if kwargs['proba'] and scorer:
raise ValueError("Cannot score probability-based predictions."
"Set either ensemble parameter 'scorer' to "
"None or layer parameter 'Proba' to False.")

# Set the layer as an attribute of the ensemble
lyr = list(self.layers.layers)[-1]
attr = lyr.replace('-', '_').replace(' ', '').strip()

setattr(self, attr, self.layers.layers[lyr])

return self
Expand Down
Loading

0 comments on commit 87c039c

Please sign in to comment.