Skip to content

Commit

Permalink
Merge pull request #9 from swansonk14/refactor
Browse files Browse the repository at this point in the history
Fixed hanging pool, add typing, overall refactor
  • Loading branch information
swansonk14 authored Dec 28, 2019
2 parents 4841896 + 656c7f2 commit f655abb
Show file tree
Hide file tree
Showing 11 changed files with 400 additions and 499 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ gifs
p_tqdm.egg-info
__pycache__
*.pyc
.idea
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
language: python
python:
- "2.7"
- "3.4"
- "3.5"
- "3.6"
- "3.7"
- "3.8"
install:
- pip install -r requirements.txt
- pip install -e .
script:
- nosetests
2 changes: 1 addition & 1 deletion LICENSE.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright (c) 2017 Kyle Swanson
Copyright (c) 2019 Kyle Swanson

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# p_tqdm

[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/p_tqdm)](https://badge.fury.io/py/p_tqdm)
[![PyPI version](https://badge.fury.io/py/p_tqdm.svg)](https://badge.fury.io/py/p_tqdm)
[![Build Status](https://travis-ci.org/swansonk14/p_tqdm.svg?branch=master)](https://travis-ci.org/swansonk14/p_tqdm)

`p_tqdm` makes parallel processing with progress bars easy.
Expand All @@ -10,8 +12,6 @@

```pip install p_tqdm```

`p_tqdm` works with Python versions 2.7, 3.4, 3.5, 3.6.

## Example

Let's say you want to add two lists element by element. Without any parallelism, this can be done easily with a Python `map`.
Expand Down
177 changes: 10 additions & 167 deletions p_tqdm/__init__.py
Original file line number Diff line number Diff line change
@@ -1,167 +1,10 @@
"""Map functions with tqdm progress bars for parallel and sequential processing.
p_map: Performs a parallel ordered map.
p_imap: Returns an iterator for a parallel ordered map.
p_umap: Performs a parallel unordered map.
p_uimap: Returns an iterator for a parallel unordered map.
t_map: Performs a sequential map.
t_imap: Returns an iterator for a sequential map.
"""

from pathos.helpers import cpu_count
from pathos.multiprocessing import ProcessingPool as Pool
from tqdm.auto import tqdm

def _parallel(ordered, function, *arrays, **kwargs):
"""Returns an iterator for a parallel map with a progress bar.
Arguments:
ordered(bool): True for an ordered map, false for an unordered map.
function(function): The function to apply to each element
of the given arrays.
arrays(tuple): One or more arrays of the same length
containing the data to be mapped. If a non-list
variable is passed, it will be repeated a number
of times equal to the lengths of the list(s). If only
non-list variables are passed, the function will be
performed num_iter times.
num_cpus(int): The number of cpus to use in parallel.
If an int, uses that many cpus.
If a float, uses that proportion of cpus.
If None, uses all available cpus.
num_iter(int): If only non-list variables are passed, the
function will be performed num_iter times on
these variables. Default: 1.
Returns:
An iterator which will apply the function
to each element of the given arrays in
parallel in order with a progress bar.
"""

# Convert tuple to list
arrays = list(arrays)

# Extract kwargs
num_cpus = kwargs.pop('num_cpus', None)
num_iter = kwargs.pop('num_iter', 1)

# Determine num_cpus
if num_cpus is None:
num_cpus = cpu_count()
elif type(num_cpus) == float:
num_cpus = int(round(num_cpus * cpu_count()))

# Determine num_iter when at least one list is present
if any([type(array) == list for array in arrays]):
num_iter = max([len(array) for array in arrays if type(array) == list])

# Convert single variables to lists
# and confirm lists are same length
for i, array in enumerate(arrays):
if type(array) != list:
arrays[i] = [array for _ in range(num_iter)]
else:
assert len(array) == num_iter

# Create parallel iterator
map_type = 'imap' if ordered else 'uimap'
iterator = tqdm(getattr(Pool(num_cpus), map_type)(function, *arrays),
total=num_iter, **kwargs)

return iterator

def p_map(function, *arrays, **kwargs):
"""Performs a parallel ordered map with a progress bar."""

ordered = True
iterator = _parallel(ordered, function, *arrays, **kwargs)
result = list(iterator)

return result

def p_imap(function, *arrays, **kwargs):
"""Returns an iterator for a parallel ordered map with a progress bar."""

ordered = True
iterator = _parallel(ordered, function, *arrays, **kwargs)

return iterator

def p_umap(function, *arrays, **kwargs):
"""Performs a parallel unordered map with a progress bar."""

ordered = False
iterator = _parallel(ordered, function, *arrays, **kwargs)
result = list(iterator)

return result

def p_uimap(function, *arrays, **kwargs):
"""Returns an iterator for a parallel unordered map with a progress bar."""

ordered = False
iterator = _parallel(ordered, function, *arrays, **kwargs)

return iterator

def _sequential(function, *arrays, **kwargs):
"""Returns an iterator for a sequential map with a progress bar.
Arguments:
function(function): The function to apply to each element
of the given arrays.
arrays(tuple): One or more arrays of the same length
containing the data to be mapped. If a non-list
variable is passed, it will be repeated a number
of times equal to the lengths of the list(s). If only
non-list variables are passed, the function will be
performed num_iter times.
num_iter(int): If only non-list variables are passed, the
function will be performed num_iter times on
these variables. Default: 1.
Returns:
An iterator which will apply the function
to each element of the given arrays sequentially
in order with a progress bar.
"""

# Convert tuple to list
arrays = list(arrays)

# Extract kwargs
num_iter = kwargs.pop('num_iter', 1)

# Determine num_iter when at least one list is present
if any([type(array) == list for array in arrays]):
num_iter = max([len(array) for array in arrays if type(array) == list])

# Convert single variables to lists
# and confirm lists are same length
for i, array in enumerate(arrays):
if type(array) != list:
arrays[i] = [array for _ in range(num_iter)]
else:
assert len(array) == num_iter

# Create parallel iterator
iterator = tqdm(map(function, *arrays),
total=num_iter, **kwargs)

return iterator

def t_map(function, *arrays, **kwargs):
"""Performs a sequential map with a progress bar."""

iterator = _sequential(function, *arrays, **kwargs)
result = list(iterator)

return result

def t_imap(function, *arrays, **kwargs):
"""Returns an iterator for a sequential map with a progress bar."""

iterator = _sequential(function, *arrays, **kwargs)

return iterator
from p_tqdm.p_tqdm import p_map, p_imap, p_umap, p_uimap, t_map, t_imap

__all__ = [
'p_map',
'p_imap',
'p_umap',
'p_uimap',
't_map',
't_imap'
]
178 changes: 178 additions & 0 deletions p_tqdm/p_tqdm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
"""Map functions with tqdm progress bars for parallel and sequential processing.
p_map: Performs a parallel ordered map.
p_imap: Returns an iterator for a parallel ordered map.
p_umap: Performs a parallel unordered map.
p_uimap: Returns an iterator for a parallel unordered map.
t_map: Performs a sequential map.
t_imap: Returns an iterator for a sequential map.
"""

from typing import Any, Callable, Generator

from pathos.helpers import cpu_count
from pathos.multiprocessing import ProcessPool as Pool
from tqdm.auto import tqdm


def _parallel(ordered: bool, function: Callable, *arrays: list, **kwargs: Any) -> Generator:
"""Returns a generator for a parallel map with a progress bar.
Arguments:
ordered(bool): True for an ordered map, false for an unordered map.
function(Callable): The function to apply to each element
of the given arrays.
arrays(Tuple[list]): One or more arrays of the same length
containing the data to be mapped. If a non-list
variable is passed, it will be repeated a number
of times equal to the lengths of the list(s). If only
non-list variables are passed, the function will be
performed num_iter times.
num_cpus(int): The number of cpus to use in parallel.
If an int, uses that many cpus.
If a float, uses that proportion of cpus.
If None, uses all available cpus.
num_iter(int): If only non-list variables are passed, the
function will be performed num_iter times on
these variables. Default: 1.
Returns:
A generator which will apply the function
to each element of the given arrays in
parallel in order with a progress bar.
"""

# Convert tuple to list
arrays = list(arrays)

# Extract kwargs
num_cpus = kwargs.pop('num_cpus', None)
num_iter = kwargs.pop('num_iter', 1)

# Determine num_cpus
if num_cpus is None:
num_cpus = cpu_count()
elif type(num_cpus) == float:
num_cpus = int(round(num_cpus * cpu_count()))

# Determine num_iter when at least one list is present
if any([type(array) == list for array in arrays]):
num_iter = max([len(array) for array in arrays if type(array) == list])

# Convert single variables to lists
# and confirm lists are same length
for i, array in enumerate(arrays):
if type(array) != list:
arrays[i] = [array for _ in range(num_iter)]
else:
assert len(array) == num_iter

# Create parallel generator
map_type = 'imap' if ordered else 'uimap'
pool = Pool(num_cpus)
map_func = getattr(pool, map_type)

for item in tqdm(map_func(function, *arrays), total=num_iter, **kwargs):
yield item

pool.clear()


def p_map(function: Callable, *arrays: list, **kwargs: Any) -> list:
"""Performs a parallel ordered map with a progress bar."""

ordered = True
iterator = _parallel(ordered, function, *arrays, **kwargs)
result = list(iterator)

return result


def p_imap(function: Callable, *arrays: list, **kwargs: Any) -> Generator:
"""Returns an iterator for a parallel ordered map with a progress bar."""

ordered = True
iterator = _parallel(ordered, function, *arrays, **kwargs)

return iterator


def p_umap(function: Callable, *arrays: list, **kwargs: Any) -> list:
"""Performs a parallel unordered map with a progress bar."""

ordered = False
iterator = _parallel(ordered, function, *arrays, **kwargs)
result = list(iterator)

return result


def p_uimap(function: Callable, *arrays: list, **kwargs: Any) -> Generator:
"""Returns an iterator for a parallel unordered map with a progress bar."""

ordered = False
iterator = _parallel(ordered, function, *arrays, **kwargs)

return iterator


def _sequential(function: Callable, *arrays: list, **kwargs: Any) -> Generator:
"""Returns a generator for a sequential map with a progress bar.
Arguments:
function(Callable): The function to apply to each element
of the given arrays.
arrays(Tuple[list]): One or more arrays of the same length
containing the data to be mapped. If a non-list
variable is passed, it will be repeated a number
of times equal to the lengths of the list(s). If only
non-list variables are passed, the function will be
performed num_iter times.
num_iter(int): If only non-list variables are passed, the
function will be performed num_iter times on
these variables. Default: 1.
Returns:
A generator which will apply the function
to each element of the given arrays sequentially
in order with a progress bar.
"""

# Convert tuple to list
arrays = list(arrays)

# Extract kwargs
num_iter = kwargs.pop('num_iter', 1)

# Determine num_iter when at least one list is present
if any([type(array) == list for array in arrays]):
num_iter = max([len(array) for array in arrays if type(array) == list])

# Convert single variables to lists
# and confirm lists are same length
for i, array in enumerate(arrays):
if type(array) != list:
arrays[i] = [array for _ in range(num_iter)]
else:
assert len(array) == num_iter

# Create sequential generator
for item in tqdm(map(function, *arrays), total=num_iter, **kwargs):
yield item


def t_map(function: Callable, *arrays: list, **kwargs: Any) -> list:
"""Performs a sequential map with a progress bar."""

iterator = _sequential(function, *arrays, **kwargs)
result = list(iterator)

return result


def t_imap(function: Callable, *arrays: list, **kwargs: Any) -> Generator:
"""Returns an iterator for a sequential map with a progress bar."""

iterator = _sequential(function, *arrays, **kwargs)

return iterator
Empty file removed p_tqdm/tests/__init__.py
Empty file.
Loading

0 comments on commit f655abb

Please sign in to comment.