Runner
Runner
¶
Run functions in parallel with joblib.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
backend |
|
choice of parallism backend, can be "loky", "multiprocessing" or "threading" |
required |
n_jobs |
|
degree of parallism, set to -1 to use all available cores |
required |
All keyword arguments during instantiaition will pass through to parallel_backend
.
More information on joblib can be found here.
Joblib can also attach to third party backends such as Ray or Apache spark,
however that functionality has not yet been tested.
Usage:
from memo import Runner
runner = Runner(backend='threading', n_jobs=2)
run(self, func, settings, progbar=True)
¶
Run function with joblibs parallel backend
Parameters:
Name | Type | Description | Default |
---|---|---|---|
func |
Callable |
The function to be run in parallel. |
required |
settings |
Iterable[Dict] |
An Iterable of Key-value pairs. |
required |
progbar |
bool |
Show progress bar. Defaults to True. |
True |
Exceptions:
Type | Description |
---|---|
TypeError |
When **kwargs doesn't match signature of |
Usage:
from memo import Runner
import numpy as np
from memo import memlist, grid, time_taken
data = []
@memlist(data=data)
@time_taken()
def birthday_experiment(class_size, n_sim):
sims = np.random.randint(1, 365 + 1, (n_sim, class_size))
sort_sims = np.sort(sims, axis=1)
n_uniq = (sort_sims[:, 1:] != sort_sims[:, :-1]).sum(axis=1) + 1
proba = np.mean(n_uniq != class_size)
return {"est_proba": proba}
settings = grid(class_size=range(20, 30), n_sim=[100, 10_000], progbar=False)
# To Run in parallel
runner = Runner(backend="threading", n_jobs=-1)
runner.run(func=birthday_experiment, settings=settings)
Source code in memo/_runner.py
def run(
self, func: Callable, settings: Iterable[Dict], progbar: bool = True
) -> None:
"""Run function with joblibs parallel backend
Args:
func (Callable): The function to be run in parallel.
settings (Iterable): An Iterable of Key-value pairs.
progbar (bool, optional): Show progress bar. Defaults to True.
Raises:
TypeError: When **kwargs doesn't match signature of `parallel_backend`
Usage:
```python
from memo import Runner
import numpy as np
from memo import memlist, grid, time_taken
data = []
@memlist(data=data)
@time_taken()
def birthday_experiment(class_size, n_sim):
sims = np.random.randint(1, 365 + 1, (n_sim, class_size))
sort_sims = np.sort(sims, axis=1)
n_uniq = (sort_sims[:, 1:] != sort_sims[:, :-1]).sum(axis=1) + 1
proba = np.mean(n_uniq != class_size)
return {"est_proba": proba}
settings = grid(class_size=range(20, 30), n_sim=[100, 10_000], progbar=False)
# To Run in parallel
runner = Runner(backend="threading", n_jobs=-1)
runner.run(func=birthday_experiment, settings=settings)
```
"""
if not isinstance(
settings, (list, tuple, set, GeneratorType)
): # check settings is iterable
raise TypeError(f"Type {type(settings)} not supported")
elif progbar and not isinstance(settings, GeneratorType):
total = len(settings)
with Progress() as progress:
task = progress.add_task("[red]Runner....", total=total)
class BatchCompletionCallBack(object):
def __init__(self, dispatch_timestamp, batch_size, parallel):
self.dispatch_timestamp = dispatch_timestamp
self.batch_size = batch_size
self.parallel = parallel
def __call__(self, out):
self.parallel.n_completed_tasks += self.batch_size
this_batch_duration = time.time() - self.dispatch_timestamp
self.parallel._backend.batch_completed(
self.batch_size, this_batch_duration
)
self.parallel.print_progress()
# Update progress bar
progress.update(
task,
completed=self.parallel.n_completed_tasks,
refresh=True,
)
with self.parallel._lock:
if self.parallel._original_iterator is not None:
self.parallel.dispatch_next()
# Monkey patch
joblib.parallel.BatchCompletionCallBack = BatchCompletionCallBack
self._run(func, settings)
else:
if isinstance(settings, GeneratorType):
warnings.warn("Progress bar not supported for generator settings")
self._run(func, settings)