pathos module documentation

abstract_launcher module

This module contains the base classes for pathos pool and pipe objects, and describes the map and pipe interfaces. A pipe is defined as a connection between two ‘nodes’, where a node is something that does work. A pipe may be a one-way or two-way connection. A map is defined as a one-to-many connection between nodes. In both map and pipe connections, results from the connected nodes can be returned to the calling node. There are several variants of pipe and map, such as whether the connection is blocking, or ordered, or asynchronous. For pipes, derived methods must overwrite the ‘pipe’ method, while maps must overwrite the ‘map’ method. Pipes and maps are available from worker pool objects, where the work is done by any of the workers in the pool. For more specific point-to-point connections (such as a pipe between two specific compute nodes), use the pipe object directly.

Usage

A typical call to a pathos map will roughly follow this example:

>>> # instantiate and configure the worker pool
>>> from pathos.pools import ProcessPool
>>> pool = ProcessPool(nodes=4)
>>>
>>> # do a blocking map on the chosen function
>>> results = pool.map(pow, [1,2,3,4], [5,6,7,8])
>>>
>>> # do a non-blocking map, then extract the results from the iterator
>>> results = pool.imap(pow, [1,2,3,4], [5,6,7,8])
>>> print("...")
>>> results = list(results)
>>>
>>> # do an asynchronous map, then get the results
>>> results = pool.amap(pow, [1,2,3,4], [5,6,7,8])
>>> while not results.ready():
...     time.sleep(5); print(".", end=' ')
...
>>> results = results.get()

Notes

Each of the pathos worker pools rely on a different transport protocol (e.g. threads, multiprocessing, etc), where the use of each pool comes with a few caveats. See the usage documentation and examples for each worker pool for more information.

class AbstractPipeConnection(*args, **kwds)

Bases: object

AbstractPipeConnection base class for pathos pipes.

Required input:

???

Additional inputs:

???

Important class members:

???

Other class members:

???

__repr__()

Return repr(self).

class AbstractWorkerPool(*args, **kwds)

Bases: object

AbstractWorkerPool base class for pathos pools.

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

__enter__()
__exit__(*args)
__get_nodes()

get the number of nodes in the pool

__imap(f, *args, **kwds)

default filter for imap inputs

__init(*args, **kwds)

default filter for __init__ inputs

__map(f, *args, **kwds)

default filter for map inputs

__nodes = 1
__pipe(f, *args, **kwds)

default filter for pipe inputs

__repr__()

Return repr(self).

__set_nodes(nodes)

set the number of nodes in the pool

_serve(*args, **kwds)

Create a new server if one isn’t already initialized

amap(f, *args, **kwds)

run a batch of jobs with an asynchronous map

Returns a results object which containts the results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until all results are retrieved. Use the ready() method on the result object to check if all results are ready. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

apipe(f, *args, **kwds)

submit a job asynchronously to a queue

Returns a results object which containts the result of calling the function f on a selected worker. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until the result is available. Use the ready() method on the results object to check if the result is ready.

clear()

Remove server with matching state

imap(f, *args, **kwds)

run a batch of jobs with a non-blocking and ordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

map(f, *args, **kwds)

run a batch of jobs with a blocking and ordered map

Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

pipe(f, *args, **kwds)

submit a job and block until results are available

Returns result of calling the function f on a selected worker. This function will block until results are available.

uimap(f, *args, **kwds)

run a batch of jobs with a non-blocking and unordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. The order of the resulting sequence is not guaranteed. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

connection module

This module contains the base class for popen pipes, and describes the popen pipe interface. The ‘config’ method can be overwritten for pipe customization. The pipe’s ‘launch’ method can be overwritten with a derived pipe’s new execution algorithm. See the following for an example of standard use.

Usage

A typical call to a popen ‘pipe’ will roughly follow this example:

>>> # instantiate the pipe
>>> pipe = Pipe()
>>>
>>> # configure the pipe to stage the command
>>> pipe(command='hostname')
>>>
>>> # execute the launch and retrieve the response
>>> pipe.launch()
>>> print(pipe.response())
class Pipe(name=None, **kwds)

Bases: object

a popen-based pipe for parallel and distributed computing

create a popen-pipe

Inputs:

name: a unique identifier (string) for the pipe command: a command to send [default = ‘echo <name>’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process

__call__(**kwds)

configure the pipe using given keywords

(Re)configure the pipe for the following inputs:

command: a command to send [default = ‘echo <name>’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process

__repr__()

Return repr(self).

_debug = <Logger pathos (WARNING)>
_execute()
config(**kwds)

configure the pipe using given keywords

(Re)configure the pipe for the following inputs:

command: a command to send [default = ‘echo <name>’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process

kill()

terminate the pipe

launch()

launch a configured command

pid()

get pipe pid

response()

Return the response from the launched process. Return None if no response was received yet from a background process.

verbose = True
exception PipeException

Bases: Exception

Exception for failure to launch a command

core module

high-level programming interface to core pathos utilities

connect(host, port=None, through=None)

establish a secure tunnel connection to a remote host at the given port

Parameters:
  • established (host -- hostname to which a tunnel should be) –

  • number (port -- port) –

  • None] (through -- 'tunnel-through' hostname [default =) –

copy(source, destination=None, **kwds)

copy source to (possibly) remote destination

Execute a copy, and return the copier. Use ‘kill’ to kill the copier, and ‘pid’ to get the process id for the copier.

Parameters:
  • 'file' (source -- path string of source) –

  • target (destination -- path string for destination) –

execute(command, host=None, bg=True, **kwds)

execute a command (possibly) on a remote host

Execute a process, and return the launcher. Use ‘response’ to retrieve the response from the executed command. Use ‘kill’ to kill the launcher, and ‘pid’ to get the process id for the launcher.

Parameters:
  • executed (command -- command string to be) –

  • None (host -- hostname of execution target [default =) –

  • True] (bg -- run as background process? [default =) –

getchild(pid=None, host=None, group=False)

get all child process ids for the given parent process id (ppid)

If pid is None, the pid of the __main__ python instance will be used.

Parameters:
  • id (group -- get process ids for the parent group) –

  • running (host -- hostname where process is) –

  • id

getpid(target=None, host=None, all=False, **kwds)

get the process id for a target process (possibly) running on remote host

This method should only be used as a last-ditch effort to find a process id. This method __may__ work when a child has been spawned and the pid was not registered… but there’s no guarantee.

If target is None, then get the process id of the __main__ python instance.

Parameters:
  • process (target -- string name of target) –

  • running (host -- hostname where process is) –

  • False] (all -- get all resulting lines from query? [default =) –

getppid(pid=None, host=None, group=False)

get parent process id (ppid) for the given process

If pid is None, the pid of the __main__ python instance will be used.

Parameters:
  • id (group -- get parent group) –

  • running (host -- hostname where process is) –

  • id

kill(pid, host=None, **kwds)

kill a process (possibly) on a remote host

Parameters:
  • id (pid -- process) –

  • None (host -- hostname where process is running [default =) –

randomport(host=None)

select a open port on a (possibly) remote host

Parameters:

port (host -- hostname on which to select a open) –

serve(server, host=None, port=None, profile='.bash_profile')

begin serving RPC requests

Parameters:
  • server – name of RPC server (i.e. ‘ppserver’)

  • host – hostname on which a server should be launched

  • port – port number (on host) that server will accept request at

  • profile – file to configure the user’s environment [default=’.bash_profile’]

helpers module

ProcessPool

alias of Pool

ThreadPool(processes=None, initializer=None, initargs=())
cpu_count()

Returns the number of CPUs in the system

freeze_support()

Check whether this is a fake forked process in a frozen executable. If so then run code specified by commandline and exit.

shutdown(type=None)

destroy all cached pools (of the given type)

hosts module

high-level programming interface to pathos host registry

_profiles = {}

For example, to register two ‘known’ host profiles:

_profiles = { ‘foobar.danse.us’:’.profile’, ‘computer.cacr.caltech.edu’:’.cshrc’, }

get_profile(rhost, assume=True)

get the default $PROFILE for a remote host

get_profiles()

get $PROFILE for each registered host

register(rhost, profile=None)

register a host and $PROFILE

register_profiles(profiles)

add dict of {‘host’:$PROFILE} to registered host profiles

maps module

maps: stand-alone map-like objects using lazy pool instantiation

class Amap(pool=None, *args, **kwds)

Bases: Map

async map instance with internal lazy pool instantiation

Parameters:
  • pool – pool object (i.e. pathos.pools.ProcessPool)

  • *args – positional arguments for pool initialization

  • **kwds – keyword arguments for pool initialization

  • close – if True, close the pool to any new jobs [Default: False]

  • join – if True, reclaim the pool’s closed workers [Default: False]

  • clear – if True, delete the pool singleton [Default: False]

NOTE: if a pool object is not provided, NotImplemented is returned

upon use.

NOTE: pools from both multiprocess and pathos.pools can be used,

however the behavior is slightly different. Pools from both pathos and multiprocess have close and join methods, to close the pool to new jobs, and to shut down the pool’s workers. Pools from pathos, however, are launched as singletons, so they also include a clear method that deletes the singleton. In either case, a pool that has been “closed” will throw a ValueError if map is then called, and similarly, a ValueError will be thrown if join is called before a pool is “closed”. The major difference is that if a pathos.pool is closed, the map instance cannot run new jobs until “clear” is called, while a new multiprocess pool will be created each time the map is executed. This leads to pathos.pools generally being called with either clear=True or clear=False, and pools from multprocess either using close=True or join=True or both. Some hierarchical parallel workflows are not allowed, and will result in an error being thrown; however, changing close, join, or clear can often remove the error.

__call__(func, *args, **kwds)

instantiate a pool and execute the pool’s async map

Parameters:
  • func – function object to map

  • *args – positional arguments for async map

  • **kwds – keyword arguments for async map

Returns:

results from execution of async map(func, *args, **kwds)

NOTE: initializes a new worker pool with each call

class Asmap(pool=None, *args, **kwds)

Bases: Map

async starmap instance with internal lazy pool instantiation

Parameters:
  • pool – pool object (i.e. pathos.pools.ProcessPool)

  • *args – positional arguments for pool initialization

  • **kwds – keyword arguments for pool initialization

  • close – if True, close the pool to any new jobs [Default: False]

  • join – if True, reclaim the pool’s closed workers [Default: False]

  • clear – if True, delete the pool singleton [Default: False]

NOTE: if a pool object is not provided, NotImplemented is returned

upon use.

NOTE: pools from both multiprocess and pathos.pools can be used,

however the behavior is slightly different. Pools from both pathos and multiprocess have close and join methods, to close the pool to new jobs, and to shut down the pool’s workers. Pools from pathos, however, are launched as singletons, so they also include a clear method that deletes the singleton. In either case, a pool that has been “closed” will throw a ValueError if map is then called, and similarly, a ValueError will be thrown if join is called before a pool is “closed”. The major difference is that if a pathos.pool is closed, the map instance cannot run new jobs until “clear” is called, while a new multiprocess pool will be created each time the map is executed. This leads to pathos.pools generally being called with either clear=True or clear=False, and pools from multprocess either using close=True or join=True or both. Some hierarchical parallel workflows are not allowed, and will result in an error being thrown; however, changing close, join, or clear can often remove the error.

__call__(func, *args, **kwds)

instantiate a pool and execute the pool’s async starmap

Parameters:
  • func – function object to map

  • *args – positional arguments for async starmap

  • **kwds – keyword arguments for async starmap

Returns:

results from execution of async starmap(func, *args, **kwds)

NOTE: initializes a new worker pool with each call

class Imap(pool=None, *args, **kwds)

Bases: Map

map iterator with internal lazy pool instantiation

Parameters:
  • pool – pool object (i.e. pathos.pools.ProcessPool)

  • *args – positional arguments for pool initialization

  • **kwds – keyword arguments for pool initialization

  • close – if True, close the pool to any new jobs [Default: False]

  • join – if True, reclaim the pool’s closed workers [Default: False]

  • clear – if True, delete the pool singleton [Default: False]

NOTE: if a pool object is not provided, a builtins.map will be

used.

NOTE: pools from both multiprocess and pathos.pools can be used,

however the behavior is slightly different. Pools from both pathos and multiprocess have close and join methods, to close the pool to new jobs, and to shut down the pool’s workers. Pools from pathos, however, are launched as singletons, so they also include a clear method that deletes the singleton. In either case, a pool that has been “closed” will throw a ValueError if map is then called, and similarly, a ValueError will be thrown if join is called before a pool is “closed”. The major difference is that if a pathos.pool is closed, the map instance cannot run new jobs until “clear” is called, while a new multiprocess pool will be created each time the map is executed. This leads to pathos.pools generally being called with either clear=True or clear=False, and pools from multprocess either using close=True or join=True or both. Some hierarchical parallel workflows are not allowed, and will result in an error being thrown; however, changing close, join, or clear can often remove the error.

__call__(func, *args, **kwds)

instantiate a pool and execute the pool’s map iterator

Parameters:
  • func – function object to map

  • *args – positional arguments for map iterator

  • **kwds – keyword arguments for map iterator

Returns:

results from execution of map(func, *args, **kwds) iterator

NOTE: initializes a new worker pool with each call

class Ismap(pool=None, *args, **kwds)

Bases: Map

starmap iterator with internal lazy pool instantiation

Parameters:
  • pool – pool object (i.e. pathos.pools.ProcessPool)

  • *args – positional arguments for pool initialization

  • **kwds – keyword arguments for pool initialization

  • close – if True, close the pool to any new jobs [Default: False]

  • join – if True, reclaim the pool’s closed workers [Default: False]

  • clear – if True, delete the pool singleton [Default: False]

NOTE: if a pool object is not provided, an itertools.starmap will

be used.

NOTE: pools from both multiprocess and pathos.pools can be used,

however the behavior is slightly different. Pools from both pathos and multiprocess have close and join methods, to close the pool to new jobs, and to shut down the pool’s workers. Pools from pathos, however, are launched as singletons, so they also include a clear method that deletes the singleton. In either case, a pool that has been “closed” will throw a ValueError if map is then called, and similarly, a ValueError will be thrown if join is called before a pool is “closed”. The major difference is that if a pathos.pool is closed, the map instance cannot run new jobs until “clear” is called, while a new multiprocess pool will be created each time the map is executed. This leads to pathos.pools generally being called with either clear=True or clear=False, and pools from multprocess either using close=True or join=True or both. Some hierarchical parallel workflows are not allowed, and will result in an error being thrown; however, changing close, join, or clear can often remove the error.

__call__(func, *args, **kwds)

instantiate a pool and execute the pool’s starmap iterator

Parameters:
  • func – function object to map

  • *args – positional arguments for starmap iterator

  • **kwds – keyword arguments for starmap iterator

Returns:

results from execution of starmap(func, *args, **kwds) iterator

NOTE: initializes a new worker pool with each call

class Map(pool=None, *args, **kwds)

Bases: object

map instance with internal lazy pool instantiation

Parameters:
  • pool – pool object (i.e. pathos.pools.ProcessPool)

  • *args – positional arguments for pool initialization

  • **kwds – keyword arguments for pool initialization

  • close – if True, close the pool to any new jobs [Default: False]

  • join – if True, reclaim the pool’s closed workers [Default: False]

  • clear – if True, delete the pool singleton [Default: False]

NOTE: if a pool object is not provided, a builtins.map will be

used with the returned iterator cast to a list.

NOTE: pools from both multiprocess and pathos.pools can be used,

however the behavior is slightly different. Pools from both pathos and multiprocess have close and join methods, to close the pool to new jobs, and to shut down the pool’s workers. Pools from pathos, however, are launched as singletons, so they also include a clear method that deletes the singleton. In either case, a pool that has been “closed” will throw a ValueError if map is then called, and similarly, a ValueError will be thrown if join is called before a pool is “closed”. The major difference is that if a pathos.pool is closed, the map instance cannot run new jobs until “clear” is called, while a new multiprocess pool will be created each time the map is executed. This leads to pathos.pools generally being called with either clear=True or clear=False, and pools from multprocess either using close=True or join=True or both. Some hierarchical parallel workflows are not allowed, and will result in an error being thrown; however, changing close, join, or clear can often remove the error.

__attr__()
__call__(func, *args, **kwds)

instantiate a pool and execute the pool’s map

Parameters:
  • func – function object to map

  • *args – positional arguments for map

  • **kwds – keyword arguments for map

Returns:

results from execution of map(func, *args, **kwds)

NOTE: initializes a new worker pool with each call

__cls__()
__del__()

shutdown the worker pool and tidy up

property __func__
property __get__
__meth__()
property __self__
clear()

remove pool singleton, if exists

close()

close the map to any new jobs

join()

reclaim the closed workers

class Smap(pool=None, *args, **kwds)

Bases: Map

starmap instance with internal lazy pool instantiation

Parameters:
  • pool – pool object (i.e. pathos.pools.ProcessPool)

  • *args – positional arguments for pool initialization

  • **kwds – keyword arguments for pool initialization

  • close – if True, close the pool to any new jobs [Default: False]

  • join – if True, reclaim the pool’s closed workers [Default: False]

  • clear – if True, delete the pool singleton [Default: False]

NOTE: if a pool object is not provided, an itertools.starmap will

be used with the returned iterator cast to a list.

NOTE: pools from both multiprocess and pathos.pools can be used,

however the behavior is slightly different. Pools from both pathos and multiprocess have close and join methods, to close the pool to new jobs, and to shut down the pool’s workers. Pools from pathos, however, are launched as singletons, so they also include a clear method that deletes the singleton. In either case, a pool that has been “closed” will throw a ValueError if map is then called, and similarly, a ValueError will be thrown if join is called before a pool is “closed”. The major difference is that if a pathos.pool is closed, the map instance cannot run new jobs until “clear” is called, while a new multiprocess pool will be created each time the map is executed. This leads to pathos.pools generally being called with either clear=True or clear=False, and pools from multprocess either using close=True or join=True or both. Some hierarchical parallel workflows are not allowed, and will result in an error being thrown; however, changing close, join, or clear can often remove the error.

__call__(func, *args, **kwds)

instantiate a pool and execute the pool’s starmap

Parameters:
  • func – function object to map

  • *args – positional arguments for starmap

  • **kwds – keyword arguments for starmap

Returns:

results from execution of starmap(func, *args, **kwds)

NOTE: initializes a new worker pool with each call

class Uimap(pool=None, *args, **kwds)

Bases: Map

unordered map iterator with internal lazy pool instantiation

Parameters:
  • pool – pool object (i.e. pathos.pools.ProcessPool)

  • *args – positional arguments for pool initialization

  • **kwds – keyword arguments for pool initialization

  • close – if True, close the pool to any new jobs [Default: False]

  • join – if True, reclaim the pool’s closed workers [Default: False]

  • clear – if True, delete the pool singleton [Default: False]

NOTE: if a pool object is not provided, NotImplemented is returned

upon use.

NOTE: pools from both multiprocess and pathos.pools can be used,

however the behavior is slightly different. Pools from both pathos and multiprocess have close and join methods, to close the pool to new jobs, and to shut down the pool’s workers. Pools from pathos, however, are launched as singletons, so they also include a clear method that deletes the singleton. In either case, a pool that has been “closed” will throw a ValueError if map is then called, and similarly, a ValueError will be thrown if join is called before a pool is “closed”. The major difference is that if a pathos.pool is closed, the map instance cannot run new jobs until “clear” is called, while a new multiprocess pool will be created each time the map is executed. This leads to pathos.pools generally being called with either clear=True or clear=False, and pools from multprocess either using close=True or join=True or both. Some hierarchical parallel workflows are not allowed, and will result in an error being thrown; however, changing close, join, or clear can often remove the error.

__call__(func, *args, **kwds)

instantiate a pool and execute the pool’s unordered map iterator

Parameters:
  • func – function object to map

  • *args – positional arguments for unordered map iterator

  • **kwds – keyword arguments for unordered map iterator

Returns:

results from execution of unordered map(func, *args, **kwds) iterator

NOTE: initializes a new worker pool with each call

mp_map module

minimal interface to python’s multiprocessing module

Notes

This module has been deprecated in favor of pathos.pools.

mp_map(function, sequence, *args, **kwds)

extend python’s parallel map function to multiprocessing

Parameters:
  • function (function - target) –

  • parallel (sequence - sequence to process in) –

  • 'autodetect'] (nproc - number of 'local' cpus to use [defaut =) –

  • ['blocking' (type - processing type) –

  • 'non-blocking'

  • 'unordered']

  • True (threads - if) –

  • multiprocessing (use threading instead of) –

multiprocessing module

This module contains map and pipe interfaces to python’s multiprocessing module.

Pipe methods provided:

pipe - blocking communication pipe [returns: value] apipe - asynchronous communication pipe [returns: object]

Map methods provided:

map - blocking and ordered worker pool [returns: list] imap - non-blocking and ordered worker pool [returns: iterator] uimap - non-blocking and unordered worker pool [returns: iterator] amap - asynchronous worker pool [returns: object]

Usage

A typical call to a pathos multiprocessing map will roughly follow this example:

>>> # instantiate and configure the worker pool
>>> from pathos.multiprocessing import ProcessPool
>>> pool = ProcessPool(nodes=4)
>>>
>>> # do a blocking map on the chosen function
>>> print(pool.map(pow, [1,2,3,4], [5,6,7,8]))
>>>
>>> # do a non-blocking map, then extract the results from the iterator
>>> results = pool.imap(pow, [1,2,3,4], [5,6,7,8])
>>> print("...")
>>> print(list(results))
>>>
>>> # do an asynchronous map, then get the results
>>> results = pool.amap(pow, [1,2,3,4], [5,6,7,8])
>>> while not results.ready():
...     time.sleep(5); print(".", end=' ')
...
>>> print(results.get())
>>>
>>> # do one item at a time, using a pipe
>>> print(pool.pipe(pow, 1, 5))
>>> print(pool.pipe(pow, 2, 6))
>>>
>>> # do one item at a time, using an asynchronous pipe
>>> result1 = pool.apipe(pow, 1, 5)
>>> result2 = pool.apipe(pow, 2, 6)
>>> print(result1.get())
>>> print(result2.get())

Notes

This worker pool leverages the python’s multiprocessing module, and thus has many of the limitations associated with that module. The function f and the sequences in args must be serializable. The maps in this worker pool have full functionality whether run from a script or in the python interpreter, and work reliably for both imported and interactively-defined functions. Unlike python’s multiprocessing module, pathos.multiprocessing maps can directly utilize functions that require multiple arguments.

class ProcessPool(*args, **kwds)

Bases: AbstractWorkerPool

Mapper that leverages python’s multiprocessing.

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will autodetect processors.

NOTE: additional keyword input is optional, with:

id - identifier for the pool initializer - function that takes no input, called when node is spawned initargs - tuple of args for initializers that have args maxtasksperchild - int that limits the max number of tasks per node

__get_nodes()

get the number of nodes used in the map

__repr__()

Return repr(self).

__set_nodes(nodes)

set the number of nodes used in the map

__state__ = {2: <multiprocess.pool.Pool state=RUN pool_size=2>}
_clear()

Remove server with matching state

_serve(nodes=None)

Create a new server if one isn’t already initialized

amap(f, *args, **kwds)

run a batch of jobs with an asynchronous map

Returns a results object which containts the results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until all results are retrieved. Use the ready() method on the result object to check if all results are ready. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

apipe(f, *args, **kwds)

submit a job asynchronously to a queue

Returns a results object which containts the result of calling the function f on a selected worker. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until the result is available. Use the ready() method on the results object to check if the result is ready.

clear()

Remove server with matching state

close()

close the pool to any new jobs

imap(f, *args, **kwds)

run a batch of jobs with a non-blocking and ordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

join()

cleanup the closed worker processes

map(f, *args, **kwds)

run a batch of jobs with a blocking and ordered map

Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

property ncpus

get the number of nodes used in the map

property nodes

get the number of nodes used in the map

pipe(f, *args, **kwds)

submit a job and block until results are available

Returns result of calling the function f on a selected worker. This function will block until results are available.

restart(force=False)

restart a closed pool

terminate()

a more abrupt close

uimap(f, *args, **kwds)

run a batch of jobs with a non-blocking and unordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. The order of the resulting sequence is not guaranteed. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

_ProcessPool

alias of Pool

parallel module

This module contains map and pipe interfaces to the parallelpython (pp) module.

Pipe methods provided:

pipe - blocking communication pipe [returns: value] apipe - asynchronous communication pipe [returns: object]

Map methods provided:

map - blocking and ordered worker pool [returns: list] imap - non-blocking and ordered worker pool [returns: iterator] uimap - non-blocking and unordered worker pool [returns: iterator] amap - asynchronous worker pool [returns: object]

Usage

A typical call to a pathos pp map will roughly follow this example:

>>> # instantiate and configure the worker pool
>>> from pathos.pp import ParallelPool
>>> pool = ParallelPool(nodes=4)
>>>
>>> # do a blocking map on the chosen function
>>> print(pool.map(pow, [1,2,3,4], [5,6,7,8]))
>>>
>>> # do a non-blocking map, then extract the results from the iterator
>>> results = pool.imap(pow, [1,2,3,4], [5,6,7,8])
>>> print("...")
>>> print(list(results))
>>>
>>> # do an asynchronous map, then get the results
>>> results = pool.amap(pow, [1,2,3,4], [5,6,7,8])
>>> while not results.ready():
...     time.sleep(5); print(".", end=' ')
...
>>> print(results.get())
>>>
>>> # do one item at a time, using a pipe
>>> print(pool.pipe(pow, 1, 5))
>>> print(pool.pipe(pow, 2, 6))
>>>
>>> # do one item at a time, using an asynchronous pipe
>>> result1 = pool.apipe(pow, 1, 5)
>>> result2 = pool.apipe(pow, 2, 6)
>>> print(result1.get())
>>> print(result2.get())

Notes

This worker pool leverages the parallelpython (pp) module, and thus has many of the limitations associated with that module. The function f and the sequences in args must be serializable. The maps in this worker pool have full functionality when run from a script, but may be somewhat limited when used in the python interpreter. Both imported and interactively-defined functions in the interpreter session may fail due to the pool failing to find the source code for the target function. For a work-around, try:

>>> # instantiate and configure the worker pool
>>> from pathos.pp import ParallelPool
>>> pool = ParallelPool(nodes=4)
>>>
>>> # wrap the function, so it can be used interactively by the pool
>>> def wrapsin(*args, **kwds):
>>>      from math import sin
>>>      return sin(*args, **kwds)
>>>
>>> # do a blocking map using the wrapped function
>>> results = pool.map(wrapsin, [1,2,3,4,5])
class ParallelPool(*args, **kwds)

Bases: AbstractWorkerPool

Mapper that leverages parallelpython (i.e. pp) maps.

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will autodetect processors.

NOTE: if a tuple of servers is not provided, defaults to localhost only.

NOTE: additional keyword input is optional, with:

id - identifier for the pool servers - tuple of pp.Servers

__get_nodes()

get the number of nodes used in the map

__get_servers()

get the servers used in the map

__repr__()

Return repr(self).

__set_nodes(nodes)

set the number of nodes used in the map

__set_servers(servers)

set the servers used in the map

__state__ = {}
_clear()

Remove server with matching state

_equals(server)

check if the server is compatible

_is_alive(server=None, negate=False, run=True)
_serve(nodes=None, servers=None)

Create a new server if one isn’t already initialized

amap(f, *args, **kwds)

run a batch of jobs with an asynchronous map

Returns a results object which containts the results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until all results are retrieved. Use the ready() method on the result object to check if all results are ready. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

apipe(f, *args, **kwds)

submit a job asynchronously to a queue

Returns a results object which containts the result of calling the function f on a selected worker. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until the result is available. Use the ready() method on the results object to check if the result is ready.

clear()

Remove server with matching state

close()

close the pool to any new jobs

imap(f, *args, **kwds)

run a batch of jobs with a non-blocking and ordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

join()

cleanup the closed worker processes

map(f, *args, **kwds)

run a batch of jobs with a blocking and ordered map

Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

property ncpus

get the number of nodes used in the map

property nodes

get the number of nodes used in the map

pipe(f, *args, **kwds)

submit a job and block until results are available

Returns result of calling the function f on a selected worker. This function will block until results are available.

restart(force=False)

restart a closed pool

property servers

get the servers used in the map

terminate()

a more abrupt close

uimap(f, *args, **kwds)

run a batch of jobs with a non-blocking and unordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. The order of the resulting sequence is not guaranteed. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

stats(pool=None)

return a string containing stats response from the pp.Server

pools module

pools: pools of pathos workers, providing map and pipe constructs

class ParallelPool(*args, **kwds)

Bases: AbstractWorkerPool

Mapper that leverages parallelpython (i.e. pp) maps.

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will autodetect processors.

NOTE: if a tuple of servers is not provided, defaults to localhost only.

NOTE: additional keyword input is optional, with:

id - identifier for the pool servers - tuple of pp.Servers

__get_nodes()

get the number of nodes used in the map

__get_servers()

get the servers used in the map

__repr__()

Return repr(self).

__set_nodes(nodes)

set the number of nodes used in the map

__set_servers(servers)

set the servers used in the map

__state__ = {}
_clear()

Remove server with matching state

_equals(server)

check if the server is compatible

_is_alive(server=None, negate=False, run=True)
_serve(nodes=None, servers=None)

Create a new server if one isn’t already initialized

amap(f, *args, **kwds)

run a batch of jobs with an asynchronous map

Returns a results object which containts the results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until all results are retrieved. Use the ready() method on the result object to check if all results are ready. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

apipe(f, *args, **kwds)

submit a job asynchronously to a queue

Returns a results object which containts the result of calling the function f on a selected worker. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until the result is available. Use the ready() method on the results object to check if the result is ready.

clear()

Remove server with matching state

close()

close the pool to any new jobs

imap(f, *args, **kwds)

run a batch of jobs with a non-blocking and ordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

join()

cleanup the closed worker processes

map(f, *args, **kwds)

run a batch of jobs with a blocking and ordered map

Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

property ncpus

get the number of nodes used in the map

property nodes

get the number of nodes used in the map

pipe(f, *args, **kwds)

submit a job and block until results are available

Returns result of calling the function f on a selected worker. This function will block until results are available.

restart(force=False)

restart a closed pool

property servers

get the servers used in the map

terminate()

a more abrupt close

uimap(f, *args, **kwds)

run a batch of jobs with a non-blocking and unordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. The order of the resulting sequence is not guaranteed. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

class ProcessPool(*args, **kwds)

Bases: AbstractWorkerPool

Mapper that leverages python’s multiprocessing.

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will autodetect processors.

NOTE: additional keyword input is optional, with:

id - identifier for the pool initializer - function that takes no input, called when node is spawned initargs - tuple of args for initializers that have args maxtasksperchild - int that limits the max number of tasks per node

__get_nodes()

get the number of nodes used in the map

__repr__()

Return repr(self).

__set_nodes(nodes)

set the number of nodes used in the map

__state__ = {2: <multiprocess.pool.Pool state=RUN pool_size=2>}
_clear()

Remove server with matching state

_serve(nodes=None)

Create a new server if one isn’t already initialized

amap(f, *args, **kwds)

run a batch of jobs with an asynchronous map

Returns a results object which containts the results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until all results are retrieved. Use the ready() method on the result object to check if all results are ready. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

apipe(f, *args, **kwds)

submit a job asynchronously to a queue

Returns a results object which containts the result of calling the function f on a selected worker. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until the result is available. Use the ready() method on the results object to check if the result is ready.

clear()

Remove server with matching state

close()

close the pool to any new jobs

imap(f, *args, **kwds)

run a batch of jobs with a non-blocking and ordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

join()

cleanup the closed worker processes

map(f, *args, **kwds)

run a batch of jobs with a blocking and ordered map

Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

property ncpus

get the number of nodes used in the map

property nodes

get the number of nodes used in the map

pipe(f, *args, **kwds)

submit a job and block until results are available

Returns result of calling the function f on a selected worker. This function will block until results are available.

restart(force=False)

restart a closed pool

terminate()

a more abrupt close

uimap(f, *args, **kwds)

run a batch of jobs with a non-blocking and unordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. The order of the resulting sequence is not guaranteed. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

class SerialPool(*args, **kwds)

Bases: AbstractWorkerPool

Mapper that leverages standard (i.e. serial) python maps.

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

__get_nodes()

get the number of nodes in the pool

__set_nodes(nodes)

set the number of nodes in the pool

__state__ = {}
_exiting = False
_is_alive(negate=False, run=True)
clear()

hard restart

close()

close the pool to any new jobs

imap(f, *args, **kwds)

run a batch of jobs with a non-blocking and ordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

join()

cleanup the closed worker processes

map(f, *args, **kwds)

run a batch of jobs with a blocking and ordered map

Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

property nodes

get the number of nodes in the pool

pipe(f, *args, **kwds)

submit a job and block until results are available

Returns result of calling the function f on a selected worker. This function will block until results are available.

restart(force=False)

restart a closed pool

terminate()

a more abrupt close

class ThreadPool(*args, **kwds)

Bases: AbstractWorkerPool

Mapper that leverages python’s threading.

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will autodetect processors.

NOTE: additional keyword input is optional, with:

id - identifier for the pool initializer - function that takes no input, called when node is spawned initargs - tuple of args for initializers that have args

__get_nodes()

get the number of nodes used in the map

__repr__()

Return repr(self).

__set_nodes(nodes)

set the number of nodes used in the map

__state__ = {2: <multiprocess.pool.ThreadPool state=RUN pool_size=2>}
_clear()

Remove server with matching state

_serve(nodes=None)

Create a new server if one isn’t already initialized

amap(f, *args, **kwds)

run a batch of jobs with an asynchronous map

Returns a results object which containts the results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until all results are retrieved. Use the ready() method on the result object to check if all results are ready. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

apipe(f, *args, **kwds)

submit a job asynchronously to a queue

Returns a results object which containts the result of calling the function f on a selected worker. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until the result is available. Use the ready() method on the results object to check if the result is ready.

clear()

Remove server with matching state

close()

close the pool to any new jobs

imap(f, *args, **kwds)

run a batch of jobs with a non-blocking and ordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

join()

cleanup the closed worker processes

map(f, *args, **kwds)

run a batch of jobs with a blocking and ordered map

Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

property nodes

get the number of nodes used in the map

property nthreads

get the number of nodes used in the map

pipe(f, *args, **kwds)

submit a job and block until results are available

Returns result of calling the function f on a selected worker. This function will block until results are available.

restart(force=False)

restart a closed pool

terminate()

a more abrupt close

uimap(f, *args, **kwds)

run a batch of jobs with a non-blocking and unordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. The order of the resulting sequence is not guaranteed. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

_ProcessPool

alias of Pool

_ThreadPool(processes=None, initializer=None, initargs=())
_clear(type=None)

destroy all cached pools (of the given type)

portpicker module

This script prints out an available port number.

class portnumber(min=0, max=65536)

Bases: object

port selector

Usage:
>>> pick = portnumber(min=1024,max=65535)
>>> print( pick() )

select a port number from a given range.

The first call will return a random number from the available range, and each subsequent call will return the next number in the range.

Parameters:
  • 0] (min -- minimum port number [default =) –

  • 65536] (max -- maximum port number [default =) –

__call__()

Call self as a function.

randomport(min=1024, max=65536)

select a random port number

Parameters:
  • 1024] (min -- minimum port number [default =) –

  • 65536] (max -- maximum port number [default =) –

pp module

This module contains map and pipe interfaces to the parallelpython (pp) module.

Pipe methods provided:

pipe - blocking communication pipe [returns: value] apipe - asynchronous communication pipe [returns: object]

Map methods provided:

map - blocking and ordered worker pool [returns: list] imap - non-blocking and ordered worker pool [returns: iterator] uimap - non-blocking and unordered worker pool [returns: iterator] amap - asynchronous worker pool [returns: object]

Usage

A typical call to a pathos pp map will roughly follow this example:

>>> # instantiate and configure the worker pool
>>> from pathos.pp import ParallelPool
>>> pool = ParallelPool(nodes=4)
>>>
>>> # do a blocking map on the chosen function
>>> print(pool.map(pow, [1,2,3,4], [5,6,7,8]))
>>>
>>> # do a non-blocking map, then extract the results from the iterator
>>> results = pool.imap(pow, [1,2,3,4], [5,6,7,8])
>>> print("...")
>>> print(list(results))
>>>
>>> # do an asynchronous map, then get the results
>>> results = pool.amap(pow, [1,2,3,4], [5,6,7,8])
>>> while not results.ready():
...     time.sleep(5); print(".", end=' ')
...
>>> print(results.get())
>>>
>>> # do one item at a time, using a pipe
>>> print(pool.pipe(pow, 1, 5))
>>> print(pool.pipe(pow, 2, 6))
>>>
>>> # do one item at a time, using an asynchronous pipe
>>> result1 = pool.apipe(pow, 1, 5)
>>> result2 = pool.apipe(pow, 2, 6)
>>> print(result1.get())
>>> print(result2.get())

Notes

This worker pool leverages the parallelpython (pp) module, and thus has many of the limitations associated with that module. The function f and the sequences in args must be serializable. The maps in this worker pool have full functionality when run from a script, but may be somewhat limited when used in the python interpreter. Both imported and interactively-defined functions in the interpreter session may fail due to the pool failing to find the source code for the target function. For a work-around, try:

>>> # instantiate and configure the worker pool
>>> from pathos.pp import ParallelPool
>>> pool = ParallelPool(nodes=4)
>>>
>>> # wrap the function, so it can be used interactively by the pool
>>> def wrapsin(*args, **kwds):
>>>      from math import sin
>>>      return sin(*args, **kwds)
>>>
>>> # do a blocking map using the wrapped function
>>> results = pool.map(wrapsin, [1,2,3,4,5])
ParallelPythonPool

alias of ParallelPool

stats(pool=None)

return a string containing stats response from the pp.Server

pp_map module

minimal interface to python’s pp (parallel python) module

Implements a work-alike of the builtin map function that distributes work across many processes. As it uses ppft to do the actual parallel processing, code using this must conform to the usual ppft restrictions (arguments must be serializable, etc).

Notes

This module has been deprecated in favor of pathos.pools.

ppServer

alias of Server

pp_map(function, sequence, *args, **kwds)

extend python’s parallel map function to parallel python

Parameters:
  • function (function - target) –

  • parallel (sequence - sequence to process in) –

  • 'autodetect'] (ncpus - number of 'local' processors to use [defaut =) –

  • ()] (servers - available distributed parallel python servers [default =) –

ppmap(processes, function, sequence, *sequences)

Split the work of ‘function’ across the given number of processes. Set ‘processes’ to None to let Parallel Python autodetect the number of children to use.

Although the calling semantics should be identical to __builtin__.map (even using __builtin__.map to process arguments), it differs in that it returns a generator instead of a list. This enables lazy evaluation of the results so that other work can be done while the subprocesses are still running.

>>> def rangetotal(n): return n, sum(range(n))
>>> list(map(rangetotal, range(1, 6)))
[(1, 0), (2, 1), (3, 3), (4, 6), (5, 10)]
>>> list(ppmap(1, rangetotal, range(1, 6)))
[(1, 0), (2, 1), (3, 3), (4, 6), (5, 10)]
print_stats(servers=None)

print stats from the pp.Server

stats(pool=None)

return a string containing stats response from the pp.Server

profile module

This module contains functions for profiling in other threads and processes.

Functions for identifying a thread/process:

process_id - get the identifier (process id) for the current process thread_id - get the identifier for the current thread

Functions for controlling profiling:

enable_profiling - initialize a profiler in the current thread/process start_profiling - begin profiling everything in the current thread/process stop_profiling - stop profiling everything in the current thread/process disable_profiling - remove the profiler from the current thread/process

Functions that control profile statstics (pstats) output

clear_stats - clear stored pstats from the current thread/process get_stats - get stored pstats for the current thread/process print_stats - print stored pstats for the current thread/process dump_stats - dump stored pstats for the current thread/process

Functions that add/remove profiling:

profiled - decorator to add profiling to a function not_profiled - decorator to remove profiling from a function profile - decorator for profiling a function (will enable_profiling)

Usage

Typical calls to pathos profiling will roughly follow this example:

 >>> import time
 >>> import random
 >>> import pathos.profile as pr
 >>>
 >>> # build a worker function
 >>> def _work(i):
 ...     x = random.random()
 ...     time.sleep(x)
 ...     return (i,x)
 >>>
 >>> # generate a 'profiled' work function
 >>> config = dict(gen=pr.process_id)
 >>> work = pr.profiled(**config)(_work)
 >>>
 >>> # enable profiling
 >>> pr.enable_profiling()
 >>>
 >>> # profile the work (not the map internals) in the main process
 >>> for i in map(work, range(-10,0)):
 ...     print(i)
 ...
 >>> # profile the map in the main process, and work in the other process
 >>> from pathos.helpers import mp
 >>> pool = mp.Pool(10)
 >>> _uimap = pr.profiled(**config)(pool.imap_unordered)
 >>> for i in _uimap(work, range(-10,0)):
 ...     print(i)
 ...
 >>> # deactivate all profiling
 >>> pr.disable_profiling() # in the main process
 >>> tuple(_uimap(pr.disable_profiling, range(10))) # in the workers
 >>> for i in _uimap(work, range(-20,-10)):
 ...     print(i)
 ...
 >>> # re-activate profiling
 >>> pr.enable_profiling()
 >>>
 >>> # print stats for profile of 'import math' in another process
 >>> def test_import(module):
 ...    __import__(module)
 ...
 >>> import pathos.pools as pp
 >>> pool = pp.ProcessPool(1)
 >>> pr.profile('cumulative', pipe=pool.pipe)(test_import, 'pox')
      10 function calls in 0.003 seconds

Ordered by: cumulative time

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1    0.000    0.000    0.003    0.003 <stdin>:1(test_import)
     1    0.002    0.002    0.003    0.003 {__import__}
     1    0.001    0.001    0.001    0.001 __init__.py:8(<module>)
     1    0.000    0.000    0.000    0.000 shutils.py:11(<module>)
     1    0.000    0.000    0.000    0.000 _disk.py:15(<module>)
     1    0.000    0.000    0.000    0.000 {eval}
     1    0.000    0.000    0.000    0.000 utils.py:11(<module>)
     1    0.000    0.000    0.000    0.000 <string>:1(<module>)
     1    0.000    0.000    0.000    0.000 info.py:2(<module>)
     1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}


 >>> pool.close()
 >>> pool.join()
 >>> pool.clear()

Notes

This module leverages the python’s cProfile module, and is primarily a high-level interface to that module that strives to make profiling in a different thread or process easier. The use of pathos.pools are suggested, however are not required (as seen in the example above).

In many cases, profiling in another thread is not necessary, and either of the following can be sufficient/better for timing and profiling:

$ python -c "import time; s=time.time(); import pathos; print (time.time()-s)"
$ python -c "import cProfile; p=cProfile.Profile(); p.enable(); import pathos; p.print_stats('cumulative')"

This module was inspired by: http://stackoverflow.com/a/32522579/4646678.

clear_stats(*args)

clear all stored profiling results from the current thread/process

disable_profiling(*args)

remove the profiler instance from the current thread/process

dump_stats(*args, **kwds)

dump all stored profiling results for the current thread/process

Notes

see pathos.profile.profiled for settings for *args and **kwds

enable_profiling(*args)

initialize a profiler instance in the current thread/process

get_stats(*args)

get all stored profiling results for the current thread/process

not_profiled(f)

decorator to remove profiling (due to ‘profiled’) from a function

print_stats(*args, **kwds)

print all stored profiling results for the current thread/process

process_id()

get the identifier (process id) for the current process

class profile(sort=None, **config)

Bases: object

decorator for profiling a function (will enable profiling)

sort is integer index of column in pstats output for sorting

Important class members:

pipe - pipe instance in which profiling is active

Example

>>> import time
>>> import random
>>> import pathos.profile as pr
>>>
... def work():
...     x = random.random()
...     time.sleep(x)
...     return x
...
>>> # profile the work; print pstats info
>>> pr.profile()(work)
         4 function calls in 0.136 seconds

Ordered by: standard name

ncalls tottime percall cumtime percall filename:lineno(function)

1 0.000 0.000 0.136 0.136 <stdin>:1(work) 1 0.000 0.000 0.000 0.000 {method ‘disable’ of ‘_lsprof.Profiler’ objects} 1 0.000 0.000 0.000 0.000 {method ‘random’ of ‘_random.Random’ objects} 1 0.136 0.136 0.136 0.136 {time.sleep}

0.1350568110491419 >>>

NOTE: pipe provided should come from pool built with nodes=1. Other

configuration keywords (config) are passed to ‘pr.profiled’. Output can be ordered by setting sort to one of the following: ‘calls’ - call count ‘cumulative’ - cumulative time ‘cumtime’ - cumulative time ‘file’ - file name ‘filename’ - file name ‘module’ - file name ‘ncalls’ - call count ‘pcalls’ - primitive call count ‘line’ - line number ‘name’ - function name ‘nfl’ - name/file/line ‘stdname’ - standard name ‘time’ - internal time ‘tottime’ - internal time

__call__(function, *args, **kwds)

Call self as a function.

class profiled(gen=None, prefix='id-', suffix='.prof')

Bases: object

decorator for profiling a function (does not call enable profiling)

y=gen(), with y an indentifier (e.g. current_process().pid)

Important class members:

prefix - string prefix for pstats filename [default: ‘id-‘] suffix - string suffix for pstats filename [default: ‘.prof’] pid - function for obtaining id of current process/thread sort - integer index of column in pstats output for sorting

Example

>>> import time
>>> import random
>>> import pathos.profile as pr
>>>
>>> config = dict(gen=pr.process_id)
>>> @pr.profiled(**config)
... def work(i):
...     x = random.random()
...     time.sleep(x)
...     return (i,x)
...
>>> pr.enable_profiling()
>>> # profile the work (not the map internals); write to file for pstats
>>> for i in map(work, range(-10,0)):
...     print(i)
...
NOTE: If gen is a bool or string, then sort=gen and pid is not used.

Otherwise, pid=gen and sort is not used. Output can be ordered by setting gen to one of the following: ‘calls’ - call count ‘cumulative’ - cumulative time ‘cumtime’ - cumulative time ‘file’ - file name ‘filename’ - file name ‘module’ - file name ‘ncalls’ - call count ‘pcalls’ - primitive call count ‘line’ - line number ‘name’ - function name ‘nfl’ - name/file/line ‘stdname’ - standard name ‘time’ - internal time ‘tottime’ - internal time

__call__(f)

Call self as a function.

start_profiling(*args)

begin profiling everything in the current thread/process

stop_profiling(*args)

stop profiling everything in the current thread/process

thread_id()

get the identifier for the current thread

python module

This module contains map and pipe interfaces to standard (i.e. serial) python.

Pipe methods provided:

pipe - blocking communication pipe [returns: value]

Map methods provided:

map - blocking and ordered worker pool [returns: list] imap - non-blocking and ordered worker pool [returns: iterator]

Usage

A typical call to a pathos python map will roughly follow this example:

>>> # instantiate and configure the worker pool
>>> from pathos.serial import SerialPool
>>> pool = SerialPool()
>>>
>>> # do a blocking map on the chosen function
>>> print(pool.map(pow, [1,2,3,4], [5,6,7,8]))
>>>
>>> # do a non-blocking map, then extract the results from the iterator
>>> results = pool.imap(pow, [1,2,3,4], [5,6,7,8])
>>> print("...")
>>> print(list(results))
>>>
>>> # do one item at a time, using a pipe
>>> print(pool.pipe(pow, 1, 5))
>>> print(pool.pipe(pow, 2, 6))

Notes

This worker pool leverages the built-in python maps, and thus does not have limitations due to serialization of the function f or the sequences in args. The maps in this worker pool have full functionality whether run from a script or in the python interpreter, and work reliably for both imported and interactively-defined functions.

PythonSerial

alias of SerialPool

secure module

class Copier(name=None, **kwds)

Bases: Pipe

a popen-based copier for parallel and distributed computing.

create a copier

Inputs:

name: a unique identifier (string) for the launcher source: hostname:path of original [user@host:path is also valid] destination: hostname:path for copy [user@host:path is also valid] launcher: remote service mechanism (i.e. scp, cp) [default = ‘scp’] options: remote service options (i.e. -v, -P) [default = ‘’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process

__call__(**kwds)

configure the copier using given keywords:

(Re)configure the copier for the following inputs:

source: hostname:path of original [user@host:path is also valid] destination: hostname:path for copy [user@host:path is also valid] launcher: remote service mechanism (i.e. scp, cp) [default = ‘scp’] options: remote service options (i.e. -v, -P) [default = ‘’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process

config(**kwds)

configure the copier using given keywords:

(Re)configure the copier for the following inputs:

source: hostname:path of original [user@host:path is also valid] destination: hostname:path for copy [user@host:path is also valid] launcher: remote service mechanism (i.e. scp, cp) [default = ‘scp’] options: remote service options (i.e. -v, -P) [default = ‘’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process

class Pipe(name=None, **kwds)

Bases: Pipe

a popen-based ssh-pipe for parallel and distributed computing.

create a ssh pipe

Inputs:

name: a unique identifier (string) for the pipe host: hostname to recieve command [user@host is also valid] command: a command to send [default = ‘echo <name>’] launcher: remote service mechanism (i.e. ssh, rsh) [default = ‘ssh’] options: remote service options (i.e. -v, -N, -L) [default = ‘’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process

__call__(**kwds)

configure a remote command using given keywords:

(Re)configure the copier for the following inputs:

host: hostname to recieve command [user@host is also valid] command: a command to send [default = ‘echo <name>’] launcher: remote service mechanism (i.e. ssh, rsh) [default = ‘ssh’] options: remote service options (i.e. -v, -N, -L) [default = ‘’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process

config(**kwds)

configure a remote command using given keywords:

(Re)configure the copier for the following inputs:

host: hostname to recieve command [user@host is also valid] command: a command to send [default = ‘echo <name>’] launcher: remote service mechanism (i.e. ssh, rsh) [default = ‘ssh’] options: remote service options (i.e. -v, -N, -L) [default = ‘’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process

class Tunnel(name=None, **kwds)

Bases: object

a ssh-tunnel launcher for parallel and distributed computing.

create a ssh tunnel launcher

Inputs:

name – a unique identifier (string) for the launcher

MAXPORT = 65535
MINPORT = 1024
__disconnect()

disconnect tunnel internals

__repr__()

Return repr(self).

_connect(localport, remotehost, remoteport, through=None)
connect(host, port=None, through=None)

establish a secure shell tunnel between local and remote host

Input:

host – remote hostname [user@host:path is also valid] port – remote port number

Additional Input:

through – ‘tunnel-through’ hostname [default = None]

disconnect()

destroy the ssh tunnel

verbose = True
exception TunnelException

Bases: Exception

Exception for failure to establish ssh tunnel

selector module

This module implements a selector class, which can be used to dispatch events and for event handler wrangling.

class Selector

Bases: object

Selector object for watching and event notification.

Takes no initial input.

_TIMEOUT = 0.5
_cleanup()
_debug = <Logger pathos.selector (WARNING)>
_dispatch(handlers, entities)
_watch()
notifyOnException(fd, handler)

add <handler> to the list of routines to call when <fd> raises an exception

notifyOnInterrupt(handler)

add <handler> to the list of routines to call when a signal arrives

notifyOnReadReady(fd, handler)

add <handler> to the list of routines to call when <fd> is read ready

notifyOnWriteReady(fd, handler)

add <handler> to the list of routines to call when <fd> is write ready

notifyWhenIdle(handler)

add <handler> to the list of routines to call when a timeout occurs

watch(timeout=None)

dispatch events to the registered hanlders

serial module

This module contains map and pipe interfaces to standard (i.e. serial) python.

Pipe methods provided:

pipe - blocking communication pipe [returns: value]

Map methods provided:

map - blocking and ordered worker pool [returns: list] imap - non-blocking and ordered worker pool [returns: iterator]

Usage

A typical call to a pathos python map will roughly follow this example:

>>> # instantiate and configure the worker pool
>>> from pathos.serial import SerialPool
>>> pool = SerialPool()
>>>
>>> # do a blocking map on the chosen function
>>> print(pool.map(pow, [1,2,3,4], [5,6,7,8]))
>>>
>>> # do a non-blocking map, then extract the results from the iterator
>>> results = pool.imap(pow, [1,2,3,4], [5,6,7,8])
>>> print("...")
>>> print(list(results))
>>>
>>> # do one item at a time, using a pipe
>>> print(pool.pipe(pow, 1, 5))
>>> print(pool.pipe(pow, 2, 6))

Notes

This worker pool leverages the built-in python maps, and thus does not have limitations due to serialization of the function f or the sequences in args. The maps in this worker pool have full functionality whether run from a script or in the python interpreter, and work reliably for both imported and interactively-defined functions.

class SerialPool(*args, **kwds)

Bases: AbstractWorkerPool

Mapper that leverages standard (i.e. serial) python maps.

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

__get_nodes()

get the number of nodes in the pool

__set_nodes(nodes)

set the number of nodes in the pool

__state__ = {}
_exiting = False
_is_alive(negate=False, run=True)
clear()

hard restart

close()

close the pool to any new jobs

imap(f, *args, **kwds)

run a batch of jobs with a non-blocking and ordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

join()

cleanup the closed worker processes

map(f, *args, **kwds)

run a batch of jobs with a blocking and ordered map

Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

property nodes

get the number of nodes in the pool

pipe(f, *args, **kwds)

submit a job and block until results are available

Returns result of calling the function f on a selected worker. This function will block until results are available.

restart(force=False)

restart a closed pool

terminate()

a more abrupt close

__get_nodes__(self)

get the number of nodes in the pool

__set_nodes__(self, nodes)

set the number of nodes in the pool

server module

This module contains the base class for pathos servers, and describes the pathos server interface. If a third-party RPC server is selected, such as ‘parallel python’ (i.e. ‘pp’) or ‘RPyC’, direct calls to the third-party interface are currently used.

class Server

Bases: object

Server base class for pathos servers for parallel and distributed computing.

Takes no initial input.

activate(onTimeout=None, selector=None)

configure the selector and install the timeout callback

deactivate()

turn off the selector

selector()

get the selector

serve(timeout)

begin serving, and set the timeout

threading module

This module contains map and pipe interfaces to python’s threading module.

Pipe methods provided:

pipe - blocking communication pipe [returns: value] apipe - asynchronous communication pipe [returns: object]

Map methods provided:

map - blocking and ordered worker pool [returns: list] imap - non-blocking and ordered worker pool [returns: iterator] uimap - non-blocking and unordered worker pool [returns: iterator] amap - asynchronous worker pool [returns: object]

Usage

A typical call to a pathos threading map will roughly follow this example:

>>> # instantiate and configure the worker pool
>>> from pathos.threading import ThreadPool
>>> pool = ThreadPool(nodes=4)
>>>
>>> # do a blocking map on the chosen function
>>> print(pool.map(pow, [1,2,3,4], [5,6,7,8]))
>>>
>>> # do a non-blocking map, then extract the results from the iterator
>>> results = pool.imap(pow, [1,2,3,4], [5,6,7,8])
>>> print("...")
>>> print(list(results))
>>>
>>> # do an asynchronous map, then get the results
>>> results = pool.amap(pow, [1,2,3,4], [5,6,7,8])
>>> while not results.ready():
...     time.sleep(5); print(".", end=' ')
...
>>> print(results.get())
>>>
>>> # do one item at a time, using a pipe
>>> print(pool.pipe(pow, 1, 5))
>>> print(pool.pipe(pow, 2, 6))
>>>
>>> # do one item at a time, using an asynchronous pipe
>>> result1 = pool.apipe(pow, 1, 5)
>>> result2 = pool.apipe(pow, 2, 6)
>>> print(result1.get())
>>> print(result2.get())

Notes

This worker pool leverages the python’s multiprocessing.dummy module, and thus has many of the limitations associated with that module. The function f and the sequences in args must be serializable. The maps in this worker pool have full functionality whether run from a script or in the python interpreter, and work reliably for both imported and interactively-defined functions. Unlike python’s multiprocessing.dummy module, pathos.threading maps can directly utilize functions that require multiple arguments.

class ThreadPool(*args, **kwds)

Bases: AbstractWorkerPool

Mapper that leverages python’s threading.

Important class members:

nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files

Other class members:

scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will autodetect processors.

NOTE: additional keyword input is optional, with:

id - identifier for the pool initializer - function that takes no input, called when node is spawned initargs - tuple of args for initializers that have args

__get_nodes()

get the number of nodes used in the map

__repr__()

Return repr(self).

__set_nodes(nodes)

set the number of nodes used in the map

__state__ = {2: <multiprocess.pool.ThreadPool state=RUN pool_size=2>}
_clear()

Remove server with matching state

_serve(nodes=None)

Create a new server if one isn’t already initialized

amap(f, *args, **kwds)

run a batch of jobs with an asynchronous map

Returns a results object which containts the results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until all results are retrieved. Use the ready() method on the result object to check if all results are ready. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

apipe(f, *args, **kwds)

submit a job asynchronously to a queue

Returns a results object which containts the result of calling the function f on a selected worker. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until the result is available. Use the ready() method on the results object to check if the result is ready.

clear()

Remove server with matching state

close()

close the pool to any new jobs

imap(f, *args, **kwds)

run a batch of jobs with a non-blocking and ordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

join()

cleanup the closed worker processes

map(f, *args, **kwds)

run a batch of jobs with a blocking and ordered map

Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

property nodes

get the number of nodes used in the map

property nthreads

get the number of nodes used in the map

pipe(f, *args, **kwds)

submit a job and block until results are available

Returns result of calling the function f on a selected worker. This function will block until results are available.

restart(force=False)

restart a closed pool

terminate()

a more abrupt close

uimap(f, *args, **kwds)

run a batch of jobs with a non-blocking and unordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. The order of the resulting sequence is not guaranteed. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.

_ThreadPool(processes=None, initializer=None, initargs=())

util module

utilities for distributed computing

_b(string, codec=None)

convert string to bytes using the given codec (default is ‘ascii’)

_str(byte, codec=None)

convert bytes to string using the given codec (default is ‘ascii’)

print_exc_info()

thread-safe return of string from print_exception call

spawn(onParent, onChild)

a unidirectional fork wrapper

Calls onParent(pid, fromchild) in parent process,

onChild(pid, toparent) in child process.

spawn2(onParent, onChild)

a bidirectional fork wrapper

Calls onParent(pid, fromchild, tochild) in parent process,

onChild(pid, fromparent, toparent) in child process.

xmlrpc module

class XMLRPCRequestHandler(server, socket)

Bases: BaseHTTPRequestHandler

create a XML-RPC request handler

Override BaseHTTPRequestHandler.__init__(): we need to be able to have (potentially) multiple handler objects at a given time.

Inputs:

server – server object to handle requests for socket – socket connection

_debug = <Logger pathos.xmlrpc (WARNING)>
_sendResponse(response)

Write the XML-RPC response

do_POST()

Access point from HTTP handler

log_message(format, *args)

Overriding BaseHTTPRequestHandler.log_message()

class XMLRPCServer(host, port)

Bases: Server, SimpleXMLRPCDispatcher

extends base pathos server to an XML-RPC dispatcher

create a XML-RPC server

Takes two initial inputs:

host – hostname of XML-RPC server host port – port number for server requests

_handleMessageFromChild(selector, fd)

handler for message from a child process

_installSocket(host, port)

prepare a listening socket

_marshaled_dispatch(data, dispatch_method=None)

override SimpleXMLRPCDispatcher._marshaled_dispatch() fault string

_onConnection(selector, fd)

upon socket connection, establish a request handler

_onSelectorIdle(selector)

something to do when there’s no requests

_onSocketConnection(socket)

upon socket connections, establish a request handler

_registerChild(pid, fromchild)

register a child process so it can be retrieved on select events

_unRegisterChild(fd)

remove a child process from active process register

activate()

install callbacks

serve()

enter the select loop… and wait for service requests