pathos module documentation

abstract_launcher module

This module contains the base classes for pathos pool and pipe objects, and describes the map and pipe interfaces. A pipe is defined as a connection between two ‘nodes’, where a node is something that does work. A pipe may be a one-way or two-way connection. A map is defined as a one-to-many connection between nodes. In both map and pipe connections, results from the connected nodes can be returned to the calling node. There are several variants of pipe and map, such as whether the connection is blocking, or ordered, or asynchronous. For pipes, derived methods must overwrite the ‘pipe’ method, while maps must overwrite the ‘map’ method. Pipes and maps are available from worker pool objects, where the work is done by any of the workers in the pool. For more specific point-to-point connections (such as a pipe between two specific compute nodes), use the pipe object directly.

Usage

A typical call to a pathos map will roughly follow this example:

>>> # instantiate and configure the worker pool
>>> from pathos.pools import ProcessPool
>>> pool = ProcessPool(nodes=4)
>>>
>>> # do a blocking map on the chosen function
>>> results = pool.map(pow, [1,2,3,4], [5,6,7,8])
>>>
>>> # do a non-blocking map, then extract the results from the iterator
>>> results = pool.imap(pow, [1,2,3,4], [5,6,7,8])
>>> print("...")
>>> results = list(results)
>>>
>>> # do an asynchronous map, then get the results
>>> results = pool.amap(pow, [1,2,3,4], [5,6,7,8])
>>> while not results.ready():
...     time.sleep(5); print(".", end=' ')
...
>>> results = results.get()

Notes

Each of the pathos worker pools rely on a different transport protocol (e.g. threads, multiprocessing, etc), where the use of each pool comes with a few caveats. See the usage documentation and examples for each worker pool for more information.

class AbstractPipeConnection(*args, **kwds)

Bases: object

AbstractPipeConnection base class for pathos pipes.

Required input:
???
Additional inputs:
???
Important class members:
???
Other class members:
???
__dict__ = dict_proxy({'__module__': 'pathos.abstract_launcher', '__repr__': <function __repr__>, '__dict__': <attribute '__dict__' of 'AbstractPipeConnection' objects>, '__weakref__': <attribute '__weakref__' of 'AbstractPipeConnection' objects>, '__doc__': '\nAbstractPipeConnection base class for pathos pipes.\n ', '__init__': <function __init__>})
__init__(*args, **kwds)
Required input:
???
Additional inputs:
???
Important class members:
???
Other class members:
???
__module__ = 'pathos.abstract_launcher'
__repr__() <==> repr(x)
__weakref__

list of weak references to the object (if defined)

class AbstractWorkerPool(*args, **kwds)

Bases: object

AbstractWorkerPool base class for pathos pools.

Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
_AbstractWorkerPool__get_nodes()

get the number of nodes in the pool

_AbstractWorkerPool__imap(f, *args, **kwds)

default filter for imap inputs

_AbstractWorkerPool__init(*args, **kwds)

default filter for __init__ inputs

_AbstractWorkerPool__map(f, *args, **kwds)

default filter for map inputs

_AbstractWorkerPool__nodes = 1
_AbstractWorkerPool__pipe(f, *args, **kwds)

default filter for pipe inputs

_AbstractWorkerPool__set_nodes(nodes)

set the number of nodes in the pool

__dict__ = dict_proxy({'map': <function map>, '__module__': 'pathos.abstract_launcher', '__exit__': <function __exit__>, '__dict__': <attribute '__dict__' of 'AbstractWorkerPool' objects>, '_serve': <function _serve>, 'uimap': <function uimap>, 'amap': <function amap>, '_AbstractWorkerPool__imap': <function __imap>, '_AbstractWorkerPool__init': <function __init>, '_AbstractWorkerPool__get_nodes': <function __get_nodes>, '_AbstractWorkerPool__pipe': <function __pipe>, '__weakref__': <attribute '__weakref__' of 'AbstractWorkerPool' objects>, 'imap': <function imap>, '__init__': <function __init__>, 'apipe': <function apipe>, 'clear': <function clear>, '__enter__': <function __enter__>, '_AbstractWorkerPool__set_nodes': <function __set_nodes>, 'pipe': <function pipe>, '_AbstractWorkerPool__map': <function __map>, '_AbstractWorkerPool__nodes': 1, '__repr__': <function __repr__>, '__doc__': '\nAbstractWorkerPool base class for pathos pools.\n '})
__enter__()
__exit__(*args)
__init__(*args, **kwds)
Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
__module__ = 'pathos.abstract_launcher'
__repr__() <==> repr(x)
__weakref__

list of weak references to the object (if defined)

_serve(*args, **kwds)

Create a new server if one isn’t already initialized

amap(f, *args, **kwds)

run a batch of jobs with an asynchronous map

Returns a results object which containts the results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until all results are retrieved. Use the ready() method on the result object to check if all results are ready.

apipe(f, *args, **kwds)

submit a job asynchronously to a queue

Returns a results object which containts the result of calling the function f on a selected worker. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until the result is available. Use the ready() method on the results object to check if the result is ready.

clear()

Remove server with matching state

imap(f, *args, **kwds)

run a batch of jobs with a non-blocking and ordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence.

map(f, *args, **kwds)

run a batch of jobs with a blocking and ordered map

Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence.

pipe(f, *args, **kwds)

submit a job and block until results are available

Returns result of calling the function f on a selected worker. This function will block until results are available.

uimap(f, *args, **kwds)

run a batch of jobs with a non-blocking and unordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. The order of the resulting sequence is not guaranteed.

connection module

This module contains the base class for popen pipes, and describes the popen pipe interface. The ‘config’ method can be overwritten for pipe customization. The pipe’s ‘launch’ method can be overwritten with a derived pipe’s new execution algorithm. See the following for an example of standard use.

Usage

A typical call to a popen ‘pipe’ will roughly follow this example:

>>> # instantiate the pipe
>>> pipe = Pipe()
>>>
>>> # configure the pipe to stage the command
>>> pipe(command='hostname')
>>>
>>> # execute the launch and retrieve the response
>>> pipe.launch()
>>> print(pipe.response())
class Pipe(name=None, **kwds)

Bases: object

a popen-based pipe for parallel and distributed computing

create a popen-pipe

Inputs:
name: a unique identifier (string) for the pipe command: a command to send [default = ‘echo <name>’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process
__call__(**kwds)

configure the pipe using given keywords

(Re)configure the pipe for the following inputs:
command: a command to send [default = ‘echo <name>’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process
__dict__ = dict_proxy({'__module__': 'pathos.connection', 'verbose': True, '_debug': <logging.Logger object>, 'launch': <function launch>, '_execute': <function _execute>, '__weakref__': <attribute '__weakref__' of 'Pipe' objects>, 'pid': <function pid>, '__doc__': 'a popen-based pipe for parallel and distributed computing', '__dict__': <attribute '__dict__' of 'Pipe' objects>, 'kill': <function kill>, '__repr__': <function __repr__>, '__call__': <function config>, 'config': <function config>, 'response': <function response>, '__init__': <function __init__>})
__init__(name=None, **kwds)

create a popen-pipe

Inputs:
name: a unique identifier (string) for the pipe command: a command to send [default = ‘echo <name>’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process
__module__ = 'pathos.connection'
__repr__() <==> repr(x)
__weakref__

list of weak references to the object (if defined)

_debug = <logging.Logger object>
_execute()
config(**kwds)

configure the pipe using given keywords

(Re)configure the pipe for the following inputs:
command: a command to send [default = ‘echo <name>’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process
kill()

terminate the pipe

launch()

launch a configured command

pid()

get pipe pid

response()

Return the response from the launched process. Return None if no response was received yet from a background process.

verbose = True
exception PipeException

Bases: exceptions.Exception

Exception for failure to launch a command

__module__ = 'pathos.connection'
__weakref__

list of weak references to the object (if defined)

core module

high-level programming interface to core pathos utilities

copy(source, destination=None, **kwds)

copy source to (possibly) remote destination

Execute a copy, and return the copier. Use ‘kill’ to kill the copier, and ‘pid’ to get the process id for the copier.

Inputs:
source – path string of source ‘file’ destination – path string for destination target
execute(command, host=None, bg=True, **kwds)

execute a command (possibly) on a remote host

Execute a process, and return the launcher. Use ‘response’ to retrieve the response from the executed command. Use ‘kill’ to kill the launcher, and ‘pid’ to get the process id for the launcher.

Inputs:
command – command string to be executed host – hostname of execution target [default = None (i.e. run locally)] bg – run as background process? [default = True]
kill(pid, host=None, **kwds)

kill a process (possibly) on a remote host

Inputs:
pid – process id host – hostname where process is running [default = None (i.e. locally)]
getpid(target=None, host=None, all=False, **kwds)

get the process id for a target process (possibly) running on remote host

This method should only be used as a last-ditch effort to find a process id. This method __may__ work when a child has been spawned and the pid was not registered… but there’s no guarantee.

If target is None, then get the process id of the __main__ python instance.

Inputs:
target – string name of target process host – hostname where process is running all – get all resulting lines from query? [default = False]
getppid(pid=None, host=None, group=False)

get parent process id (ppid) for the given process

If pid is None, the pid of the __main__ python instance will be used.

Inputs:
pid – process id host – hostname where process is running group – get parent group id (pgid) instead of direct parent id?
getchild(pid=None, host=None, group=False)

get all child process ids for the given parent process id (ppid)

If pid is None, the pid of the __main__ python instance will be used.

Inputs:
pid – parent process id host – hostname where process is running group – get process ids for the parent group id (pgid) instead?
serve(server, host=None, port=None, profile='.bash_profile')

begin serving RPC requests

Inputs:
server: name of RPC server (i.e. ‘ppserver’) host: hostname on which a server should be launched port: port number (on host) that server will accept request at profile: file to configure the user’s environment [default=’.bash_profile’]
connect(host, port=None, through=None)

establish a secure tunnel connection to a remote host at the given port

Inputs:
host – hostname to which a tunnel should be established port – port number (on host) to connect the tunnel to through – ‘tunnel-through’ hostname [default = None]
randomport(host=None)

select a open port on a (possibly) remote host

Inputs:
host – hostname on which to select a open port

helpers module

ProcessPool

alias of multiprocess.pool.Pool

ThreadPool(processes=None, initializer=None, initargs=())
cpu_count()

Returns the number of CPUs in the system

freeze_support()

Check whether this is a fake forked process in a frozen executable. If so then run code specified by commandline and exit.

hosts module

high-level programming interface to pathos host registry

_profiles = {}

For example, to register two ‘known’ host profiles

_profiles = { ‘foobar.danse.us’:’.profile’,
‘computer.cacr.caltech.edu’:’.cshrc’,

}

get_profile(rhost, assume=True)

get the default $PROFILE for a remote host

get_profiles()

get $PROFILE for each registered host

register(rhost, profile=None)

register a host and $PROFILE

register_profiles(profiles)

add dict of {‘host’:$PROFILE} to registered host profiles

mp_map module

minimal interface to python’s multiprocessing module

Notes

This module has been deprecated in favor of pathos.pools.

mp_map(function, sequence, *args, **kwds)

extend python’s parallel map function to multiprocessing

Inputs:
function – target function sequence – sequence to process in parallel
Additional Inputs:
nproc – number of ‘local’ cpus to use [defaut = ‘autodetect’] type – processing type [‘blocking’, ‘non-blocking’, ‘unordered’] threads – if True, use threading instead of multiprocessing

multiprocessing module

This module contains map and pipe interfaces to python’s multiprocessing module.

Pipe methods provided:
pipe - blocking communication pipe [returns: value] apipe - asynchronous communication pipe [returns: object]
Map methods provided:
map - blocking and ordered worker pool [returns: list] imap - non-blocking and ordered worker pool [returns: iterator] uimap - non-blocking and unordered worker pool [returns: iterator] amap - asynchronous worker pool [returns: object]

Usage

A typical call to a pathos multiprocessing map will roughly follow this example:

>>> # instantiate and configure the worker pool
>>> from pathos.multiprocessing import ProcessPool
>>> pool = ProcessPool(nodes=4)
>>>
>>> # do a blocking map on the chosen function
>>> print(pool.map(pow, [1,2,3,4], [5,6,7,8]))
>>>
>>> # do a non-blocking map, then extract the results from the iterator
>>> results = pool.imap(pow, [1,2,3,4], [5,6,7,8])
>>> print("...")
>>> print(list(results))
>>>
>>> # do an asynchronous map, then get the results
>>> results = pool.amap(pow, [1,2,3,4], [5,6,7,8])
>>> while not results.ready():
...     time.sleep(5); print(".", end=' ')
...
>>> print(results.get())
>>>
>>> # do one item at a time, using a pipe
>>> print(pool.pipe(pow, 1, 5))
>>> print(pool.pipe(pow, 2, 6))
>>>
>>> # do one item at a time, using an asynchronous pipe
>>> result1 = pool.apipe(pow, 1, 5)
>>> result2 = pool.apipe(pow, 2, 6)
>>> print(result1.get())
>>> print(result2.get())

Notes

This worker pool leverages the python’s multiprocessing module, and thus has many of the limitations associated with that module. The function f and the sequences in args must be serializable. The maps in this worker pool have full functionality whether run from a script or in the python interpreter, and work reliably for both imported and interactively-defined functions. Unlike python’s multiprocessing module, pathos.multiprocessing maps can directly utilize functions that require multiple arguments.

class ProcessPool(*args, **kwds)

Bases: pathos.abstract_launcher.AbstractWorkerPool

Mapper that leverages python’s multiprocessing.

Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will autodetect processors

_ProcessPool__get_nodes()

get the number of nodes used in the map

_ProcessPool__set_nodes(nodes)

set the number of nodes used in the map

__init__(*args, **kwds)
Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will autodetect processors

__module__ = 'pathos.multiprocessing'
__repr__() <==> repr(x)
_clear()

Remove server with matching state

_serve(nodes=None)

Create a new server if one isn’t already initialized

amap(f, *args, **kwds)

run a batch of jobs with an asynchronous map

Returns a results object which containts the results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until all results are retrieved. Use the ready() method on the result object to check if all results are ready.

apipe(f, *args, **kwds)

submit a job asynchronously to a queue

Returns a results object which containts the result of calling the function f on a selected worker. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until the result is available. Use the ready() method on the results object to check if the result is ready.

clear()

Remove server with matching state

close()

close the pool to any new jobs

imap(f, *args, **kwds)

run a batch of jobs with a non-blocking and ordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence.

join()

cleanup the closed worker processes

map(f, *args, **kwds)

run a batch of jobs with a blocking and ordered map

Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence.

ncpus

get the number of nodes used in the map

nodes

get the number of nodes used in the map

pipe(f, *args, **kwds)

submit a job and block until results are available

Returns result of calling the function f on a selected worker. This function will block until results are available.

restart(force=False)

restart a closed pool

terminate()

a more abrupt close

uimap(f, *args, **kwds)

run a batch of jobs with a non-blocking and unordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. The order of the resulting sequence is not guaranteed.

_ProcessPool

alias of multiprocess.pool.Pool

parallel module

This module contains map and pipe interfaces to the parallelpython (pp) module.

Pipe methods provided:
pipe - blocking communication pipe [returns: value] apipe - asynchronous communication pipe [returns: object]
Map methods provided:
map - blocking and ordered worker pool [returns: list] imap - non-blocking and ordered worker pool [returns: iterator] uimap - non-blocking and unordered worker pool [returns: iterator] amap - asynchronous worker pool [returns: object]

Usage

A typical call to a pathos pp map will roughly follow this example:

>>> # instantiate and configure the worker pool
>>> from pathos.pp import ParallelPool
>>> pool = ParallelPool(nodes=4)
>>>
>>> # do a blocking map on the chosen function
>>> print(pool.map(pow, [1,2,3,4], [5,6,7,8]))
>>>
>>> # do a non-blocking map, then extract the results from the iterator
>>> results = pool.imap(pow, [1,2,3,4], [5,6,7,8])
>>> print("...")
>>> print(list(results))
>>>
>>> # do an asynchronous map, then get the results
>>> results = pool.amap(pow, [1,2,3,4], [5,6,7,8])
>>> while not results.ready():
...     time.sleep(5); print(".", end=' ')
...
>>> print(results.get())
>>>
>>> # do one item at a time, using a pipe
>>> print(pool.pipe(pow, 1, 5))
>>> print(pool.pipe(pow, 2, 6))
>>>
>>> # do one item at a time, using an asynchronous pipe
>>> result1 = pool.apipe(pow, 1, 5)
>>> result2 = pool.apipe(pow, 2, 6)
>>> print(result1.get())
>>> print(result2.get())

Notes

This worker pool leverages the parallelpython (pp) module, and thus has many of the limitations associated with that module. The function f and the sequences in args must be serializable. The maps in this worker pool have full functionality when run from a script, but may be somewhat limited when used in the python interpreter. Both imported and interactively-defined functions in the interpreter session may fail due to the pool failing to find the source code for the target function. For a work-around, try:

>>> # instantiate and configure the worker pool
>>> from pathos.pp import ParallelPool
>>> pool = ParallelPool(nodes=4)
>>>
>>> # wrap the function, so it can be used interactively by the pool
>>> def wrapsin(*args, **kwds):
>>>      from math import sin
>>>      return sin(*args, **kwds)
>>>
>>> # do a blocking map using the wrapped function
>>> results = pool.map(wrapsin, [1,2,3,4,5])
class ParallelPool(*args, **kwds)

Bases: pathos.abstract_launcher.AbstractWorkerPool

Mapper that leverages parallelpython (i.e. pp) maps.

Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will autodetect processors NOTE: if a tuple of servers is not provided, defaults to localhost only

_ParallelPool__get_nodes()

get the number of nodes used in the map

_ParallelPool__get_servers()

get the servers used in the map

_ParallelPool__set_nodes(nodes)

set the number of nodes used in the map

_ParallelPool__set_servers(servers)

set the servers used in the map

__init__(*args, **kwds)
Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will autodetect processors NOTE: if a tuple of servers is not provided, defaults to localhost only

__module__ = 'pathos.parallel'
__repr__() <==> repr(x)
_clear()

Remove server with matching state

_equals(server)

check if the server is compatible

_is_alive(server=None, negate=False, run=True)
_serve(nodes=None, servers=None)

Create a new server if one isn’t already initialized

amap(f, *args, **kwds)

run a batch of jobs with an asynchronous map

Returns a results object which containts the results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until all results are retrieved. Use the ready() method on the result object to check if all results are ready.

apipe(f, *args, **kwds)

submit a job asynchronously to a queue

Returns a results object which containts the result of calling the function f on a selected worker. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until the result is available. Use the ready() method on the results object to check if the result is ready.

clear()

Remove server with matching state

close()

close the pool to any new jobs

imap(f, *args, **kwds)

run a batch of jobs with a non-blocking and ordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence.

join()

cleanup the closed worker processes

map(f, *args, **kwds)

run a batch of jobs with a blocking and ordered map

Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence.

ncpus

get the number of nodes used in the map

nodes

get the number of nodes used in the map

pipe(f, *args, **kwds)

submit a job and block until results are available

Returns result of calling the function f on a selected worker. This function will block until results are available.

restart(force=False)

restart a closed pool

servers

get the servers used in the map

terminate()

a more abrupt close

uimap(f, *args, **kwds)

run a batch of jobs with a non-blocking and unordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. The order of the resulting sequence is not guaranteed.

stats(pool=None)

return a string containing stats response from the pp.Server

pools module

pools: pools of pathos workers, providing map and pipe constructs

class ParallelPool(*args, **kwds)

Bases: pathos.abstract_launcher.AbstractWorkerPool

Mapper that leverages parallelpython (i.e. pp) maps.

Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will autodetect processors NOTE: if a tuple of servers is not provided, defaults to localhost only

_ParallelPool__get_nodes()

get the number of nodes used in the map

_ParallelPool__get_servers()

get the servers used in the map

_ParallelPool__set_nodes(nodes)

set the number of nodes used in the map

_ParallelPool__set_servers(servers)

set the servers used in the map

__init__(*args, **kwds)
Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will autodetect processors NOTE: if a tuple of servers is not provided, defaults to localhost only

__module__ = 'pathos.parallel'
__repr__() <==> repr(x)
_clear()

Remove server with matching state

_equals(server)

check if the server is compatible

_is_alive(server=None, negate=False, run=True)
_serve(nodes=None, servers=None)

Create a new server if one isn’t already initialized

amap(f, *args, **kwds)

run a batch of jobs with an asynchronous map

Returns a results object which containts the results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until all results are retrieved. Use the ready() method on the result object to check if all results are ready.

apipe(f, *args, **kwds)

submit a job asynchronously to a queue

Returns a results object which containts the result of calling the function f on a selected worker. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until the result is available. Use the ready() method on the results object to check if the result is ready.

clear()

Remove server with matching state

close()

close the pool to any new jobs

imap(f, *args, **kwds)

run a batch of jobs with a non-blocking and ordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence.

join()

cleanup the closed worker processes

map(f, *args, **kwds)

run a batch of jobs with a blocking and ordered map

Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence.

ncpus

get the number of nodes used in the map

nodes

get the number of nodes used in the map

pipe(f, *args, **kwds)

submit a job and block until results are available

Returns result of calling the function f on a selected worker. This function will block until results are available.

restart(force=False)

restart a closed pool

servers

get the servers used in the map

terminate()

a more abrupt close

uimap(f, *args, **kwds)

run a batch of jobs with a non-blocking and unordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. The order of the resulting sequence is not guaranteed.

class ProcessPool(*args, **kwds)

Bases: pathos.abstract_launcher.AbstractWorkerPool

Mapper that leverages python’s multiprocessing.

Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will autodetect processors

_ProcessPool__get_nodes()

get the number of nodes used in the map

_ProcessPool__set_nodes(nodes)

set the number of nodes used in the map

__init__(*args, **kwds)
Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will autodetect processors

__module__ = 'pathos.multiprocessing'
__repr__() <==> repr(x)
_clear()

Remove server with matching state

_serve(nodes=None)

Create a new server if one isn’t already initialized

amap(f, *args, **kwds)

run a batch of jobs with an asynchronous map

Returns a results object which containts the results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until all results are retrieved. Use the ready() method on the result object to check if all results are ready.

apipe(f, *args, **kwds)

submit a job asynchronously to a queue

Returns a results object which containts the result of calling the function f on a selected worker. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until the result is available. Use the ready() method on the results object to check if the result is ready.

clear()

Remove server with matching state

close()

close the pool to any new jobs

imap(f, *args, **kwds)

run a batch of jobs with a non-blocking and ordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence.

join()

cleanup the closed worker processes

map(f, *args, **kwds)

run a batch of jobs with a blocking and ordered map

Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence.

ncpus

get the number of nodes used in the map

nodes

get the number of nodes used in the map

pipe(f, *args, **kwds)

submit a job and block until results are available

Returns result of calling the function f on a selected worker. This function will block until results are available.

restart(force=False)

restart a closed pool

terminate()

a more abrupt close

uimap(f, *args, **kwds)

run a batch of jobs with a non-blocking and unordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. The order of the resulting sequence is not guaranteed.

class SerialPool(*args, **kwds)

Bases: pathos.abstract_launcher.AbstractWorkerPool

Mapper that leverages standard (i.e. serial) python maps.

Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
_SerialPool__get_nodes()

get the number of nodes in the pool

_SerialPool__set_nodes(nodes)

set the number of nodes in the pool

__module__ = 'pathos.serial'
_exiting = False
_is_alive(negate=False, run=True)
clear()

hard restart

close()

close the pool to any new jobs

imap(f, *args, **kwds)

run a batch of jobs with a non-blocking and ordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence.

join()

cleanup the closed worker processes

map(f, *args, **kwds)

run a batch of jobs with a blocking and ordered map

Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence.

nodes

get the number of nodes in the pool

pipe(f, *args, **kwds)

submit a job and block until results are available

Returns result of calling the function f on a selected worker. This function will block until results are available.

restart(force=False)

restart a closed pool

terminate()

a more abrupt close

class ThreadPool(*args, **kwds)

Bases: pathos.abstract_launcher.AbstractWorkerPool

Mapper that leverages python’s threading.

Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will autodetect processors

_ThreadPool__get_nodes()

get the number of nodes used in the map

_ThreadPool__set_nodes(nodes)

set the number of nodes used in the map

__init__(*args, **kwds)
Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will autodetect processors

__module__ = 'pathos.threading'
__repr__() <==> repr(x)
_clear()

Remove server with matching state

_serve(nodes=None)

Create a new server if one isn’t already initialized

amap(f, *args, **kwds)

run a batch of jobs with an asynchronous map

Returns a results object which containts the results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until all results are retrieved. Use the ready() method on the result object to check if all results are ready.

apipe(f, *args, **kwds)

submit a job asynchronously to a queue

Returns a results object which containts the result of calling the function f on a selected worker. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until the result is available. Use the ready() method on the results object to check if the result is ready.

clear()

Remove server with matching state

close()

close the pool to any new jobs

imap(f, *args, **kwds)

run a batch of jobs with a non-blocking and ordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence.

join()

cleanup the closed worker processes

map(f, *args, **kwds)

run a batch of jobs with a blocking and ordered map

Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence.

nodes

get the number of nodes used in the map

nthreads

get the number of nodes used in the map

pipe(f, *args, **kwds)

submit a job and block until results are available

Returns result of calling the function f on a selected worker. This function will block until results are available.

restart(force=False)

restart a closed pool

terminate()

a more abrupt close

uimap(f, *args, **kwds)

run a batch of jobs with a non-blocking and unordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. The order of the resulting sequence is not guaranteed.

_ProcessPool

alias of multiprocess.pool.Pool

_ThreadPool(processes=None, initializer=None, initargs=())

portpicker module

This script prints out an available port number.

class portnumber(min=0, max=65536)

Bases: object

port selector

Usage:
>>> pick = portnumber(min=1024,max=65535)
>>> print( pick() )

select a port number from a given range.

The first call will return a random number from the available range, and each subsequent call will return the next number in the range.

Inputs:
min – minimum port number [default = 0] max – maximum port number [default = 65536]
__call__(...) <==> x(...)
__dict__ = dict_proxy({'__module__': 'pathos.portpicker', '__dict__': <attribute '__dict__' of 'portnumber' objects>, '__call__': <function __call__>, '__weakref__': <attribute '__weakref__' of 'portnumber' objects>, '__doc__': 'port selector\n\nUsage:\n >>> pick = portnumber(min=1024,max=65535)\n >>> print( pick() )\n ', '__init__': <function __init__>})
__init__(min=0, max=65536)

select a port number from a given range.

The first call will return a random number from the available range, and each subsequent call will return the next number in the range.

Inputs:
min – minimum port number [default = 0] max – maximum port number [default = 65536]
__module__ = 'pathos.portpicker'
__weakref__

list of weak references to the object (if defined)

randomport(min=1024, max=65536)

select a random port number

Inputs:
min – minimum port number [default = 1024] max – maximum port number [default = 65536]

pp module

This module contains map and pipe interfaces to the parallelpython (pp) module.

Pipe methods provided:
pipe - blocking communication pipe [returns: value] apipe - asynchronous communication pipe [returns: object]
Map methods provided:
map - blocking and ordered worker pool [returns: list] imap - non-blocking and ordered worker pool [returns: iterator] uimap - non-blocking and unordered worker pool [returns: iterator] amap - asynchronous worker pool [returns: object]

Usage

A typical call to a pathos pp map will roughly follow this example:

>>> # instantiate and configure the worker pool
>>> from pathos.pp import ParallelPool
>>> pool = ParallelPool(nodes=4)
>>>
>>> # do a blocking map on the chosen function
>>> print(pool.map(pow, [1,2,3,4], [5,6,7,8]))
>>>
>>> # do a non-blocking map, then extract the results from the iterator
>>> results = pool.imap(pow, [1,2,3,4], [5,6,7,8])
>>> print("...")
>>> print(list(results))
>>>
>>> # do an asynchronous map, then get the results
>>> results = pool.amap(pow, [1,2,3,4], [5,6,7,8])
>>> while not results.ready():
...     time.sleep(5); print(".", end=' ')
...
>>> print(results.get())
>>>
>>> # do one item at a time, using a pipe
>>> print(pool.pipe(pow, 1, 5))
>>> print(pool.pipe(pow, 2, 6))
>>>
>>> # do one item at a time, using an asynchronous pipe
>>> result1 = pool.apipe(pow, 1, 5)
>>> result2 = pool.apipe(pow, 2, 6)
>>> print(result1.get())
>>> print(result2.get())

Notes

This worker pool leverages the parallelpython (pp) module, and thus has many of the limitations associated with that module. The function f and the sequences in args must be serializable. The maps in this worker pool have full functionality when run from a script, but may be somewhat limited when used in the python interpreter. Both imported and interactively-defined functions in the interpreter session may fail due to the pool failing to find the source code for the target function. For a work-around, try:

>>> # instantiate and configure the worker pool
>>> from pathos.pp import ParallelPool
>>> pool = ParallelPool(nodes=4)
>>>
>>> # wrap the function, so it can be used interactively by the pool
>>> def wrapsin(*args, **kwds):
>>>      from math import sin
>>>      return sin(*args, **kwds)
>>>
>>> # do a blocking map using the wrapped function
>>> results = pool.map(wrapsin, [1,2,3,4,5])
ParallelPythonPool

alias of pathos.parallel.ParallelPool

stats(pool=None)

return a string containing stats response from the pp.Server

pp_map module

minimal interface to python’s pp (parallel python) module

Implements a work-alike of the builtin map function that distributes work across many processes. As it uses ppft to do the actual parallel processing, code using this must conform to the usual ppft restrictions (arguments must be serializable, etc).

Notes

This module has been deprecated in favor of pathos.pools.

ppServer

alias of pp._pp.Server

pp_map(function, sequence, *args, **kwds)

extend python’s parallel map function to parallel python

Inputs:
function – target function sequence – sequence to process in parallel
Additional Inputs:
ncpus – number of ‘local’ processors to use [defaut = ‘autodetect’] servers – available distributed parallel python servers [default = ()]
ppmap(processes, function, sequence, *sequences)

Split the work of ‘function’ across the given number of processes. Set ‘processes’ to None to let Parallel Python autodetect the number of children to use.

Although the calling semantics should be identical to __builtin__.map (even using __builtin__.map to process arguments), it differs in that it returns a generator instead of a list. This enables lazy evaluation of the results so that other work can be done while the subprocesses are still running.

>>> def rangetotal(n): return n, sum(range(n))
>>> list(map(rangetotal, range(1, 6)))
[(1, 0), (2, 1), (3, 3), (4, 6), (5, 10)]
>>> list(ppmap(1, rangetotal, range(1, 6)))
[(1, 0), (2, 1), (3, 3), (4, 6), (5, 10)]
print_stats(servers=None)

print stats from the pp.Server

stats(pool=None)

return a string containing stats response from the pp.Server

profile module

This module contains functions for profiling in other threads and processes.

Functions for identifying a thread/process:
process_id - get the identifier (process id) for the current process thread_id - get the identifier for the current thread
Functions for controlling profiling:
enable_profiling - initialize a profiler in the current thread/process start_profiling - begin profiling everything in the current thread/process stop_profiling - stop profiling everything in the current thread/process disable_profiling - remove the profiler from the current thread/process
Functions that control profile statstics (pstats) output
clear_stats - clear stored pstats from the current thread/process get_stats - get stored pstats for the current thread/process print_stats - print stored pstats for the current thread/process dump_stats - dump stored pstats for the current thread/process
Functions that add/remove profiling:
profiled - decorator to add profiling to a function not_profiled - decorator to remove profiling from a function profile - decorator for profiling a function (will enable_profiling)

Usage

Typical calls to pathos profiling will roughly follow this example:

 >>> import time
 >>> import random
 >>> import pathos.profile as pr
 >>>
 >>> # build a worker function
 >>> def _work(i):
 ...     x = random.random()
 ...     time.sleep(x)
 ...     return (i,x)
 >>>
 >>> # generate a 'profiled' work function
 >>> config = dict(gen=pr.process_id)
 >>> work = pr.profiled(**config)(_work)
 >>>
 >>> # enable profiling
 >>> pr.enable_profiling()
 >>>
 >>> # profile the work (not the map internals) in the main process
 >>> from itertools import imap
 >>> for i in imap(work, range(-10,0)):
 ...     print(i)
 ...
 >>> # profile the map in the main process, and work in the other process
 >>> from pathos.helpers import mp
 >>> pool = mp.Pool(10)
 >>> _uimap = pr.profiled(**config)(pool.imap_unordered)
 >>> for i in _uimap(work, range(-10,0)):
 ...     print(i)
 ...
 >>> # deactivate all profiling
 >>> pr.disable_profiling() # in the main process
 >>> tuple(_uimap(pr.disable_profiling, range(10))) # in the workers
 >>> for i in _uimap(work, range(-20,-10)):
 ...     print(i)
 ...
 >>> # re-activate profiling
 >>> pr.enable_profiling()
 >>>
 >>> # print stats for profile of 'import math' in another process
 >>> def test_import(module):
 ...    __import__(module)
 ...
 >>> import pathos.pools as pp
 >>> pool = pp.ProcessPool(1)
 >>> pr.profile('cumulative', pipe=pool.pipe)(test_import, 'pox')
      10 function calls in 0.003 seconds

Ordered by: cumulative time

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1    0.000    0.000    0.003    0.003 <stdin>:1(test_import)
     1    0.002    0.002    0.003    0.003 {__import__}
     1    0.001    0.001    0.001    0.001 __init__.py:8(<module>)
     1    0.000    0.000    0.000    0.000 shutils.py:11(<module>)
     1    0.000    0.000    0.000    0.000 _disk.py:15(<module>)
     1    0.000    0.000    0.000    0.000 {eval}
     1    0.000    0.000    0.000    0.000 utils.py:11(<module>)
     1    0.000    0.000    0.000    0.000 <string>:1(<module>)
     1    0.000    0.000    0.000    0.000 info.py:2(<module>)
     1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}


 >>> pool.close()
 >>> pool.join()
 >>> pool.clear()

Notes

This module leverages the python’s cProfile module, and is primarily a high-level interface to that module that strives to make profiling in a different thread or process easier. The use of pathos.pools are suggested, however are not required (as seen in the example above).

In many cases, profiling in another thread is not necessary, and either of the following can be sufficient/better for timing and profiling:

$ python -c "import time; s=time.time(); import pathos; print time.time()-s"
$ python -c "import cProfile; p=cProfile.Profile(); p.enable(); import pathos; p.print_stats('cumulative')"

This module was inspired by: http://stackoverflow.com/a/32522579/4646678.

clear_stats(*args)

clear all stored profiling results from the current thread/process

disable_profiling(*args)

remove the profiler instance from the current thread/process

dump_stats(*args, **kwds)

dump all stored profiling results for the current thread/process

Notes

see pathos.profile.profiled for settings for *args and **kwds

enable_profiling(*args)

initialize a profiler instance in the current thread/process

get_stats(*args)

get all stored profiling results for the current thread/process

not_profiled(f)

decorator to remove profiling (due to ‘profiled’) from a function

print_stats(*args, **kwds)

print all stored profiling results for the current thread/process

process_id()

get the identifier (process id) for the current process

class profile(sort=None, **config)

Bases: object

decorator for profiling a function (will enable profiling)

sort is integer index of column in pstats output for sorting

Important class members:
pipe - pipe instance in which profiling is active

Example

>>> import time
>>> import random
>>> import pathos.profile as pr
>>>
... def work():
...     x = random.random()
...     time.sleep(x)
...     return x
...
>>> # profile the work; print pstats info
>>> pr.profile()(work)
         4 function calls in 0.136 seconds

Ordered by: standard name

ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 0.136 0.136 <stdin>:1(work) 1 0.000 0.000 0.000 0.000 {method ‘disable’ of ‘_lsprof.Profiler’ objects} 1 0.000 0.000 0.000 0.000 {method ‘random’ of ‘_random.Random’ objects} 1 0.136 0.136 0.136 0.136 {time.sleep}

0.1350568110491419 >>>

NOTE: pipe provided should come from pool built with nodes=1. Other
configuration keywords (config) are passed to ‘pr.profiled’. Output can be ordered by setting sort to one of the following: ‘calls’ - call count ‘cumulative’ - cumulative time ‘cumtime’ - cumulative time ‘file’ - file name ‘filename’ - file name ‘module’ - file name ‘ncalls’ - call count ‘pcalls’ - primitive call count ‘line’ - line number ‘name’ - function name ‘nfl’ - name/file/line ‘stdname’ - standard name ‘time’ - internal time ‘tottime’ - internal time
__call__(...) <==> x(...)
__dict__ = dict_proxy({'__module__': 'pathos.profile', '__dict__': <attribute '__dict__' of 'profile' objects>, '__call__': <function __call__>, '__weakref__': <attribute '__weakref__' of 'profile' objects>, '__doc__': 'decorator for profiling a function (will enable profiling)', '__init__': <function __init__>})
__init__(sort=None, **config)

sort is integer index of column in pstats output for sorting

Important class members:
pipe - pipe instance in which profiling is active

Example

>>> import time
>>> import random
>>> import pathos.profile as pr
>>>
... def work():
...     x = random.random()
...     time.sleep(x)
...     return x
...
>>> # profile the work; print pstats info
>>> pr.profile()(work)
         4 function calls in 0.136 seconds

Ordered by: standard name

ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 0.136 0.136 <stdin>:1(work) 1 0.000 0.000 0.000 0.000 {method ‘disable’ of ‘_lsprof.Profiler’ objects} 1 0.000 0.000 0.000 0.000 {method ‘random’ of ‘_random.Random’ objects} 1 0.136 0.136 0.136 0.136 {time.sleep}

0.1350568110491419 >>>

NOTE: pipe provided should come from pool built with nodes=1. Other
configuration keywords (config) are passed to ‘pr.profiled’. Output can be ordered by setting sort to one of the following: ‘calls’ - call count ‘cumulative’ - cumulative time ‘cumtime’ - cumulative time ‘file’ - file name ‘filename’ - file name ‘module’ - file name ‘ncalls’ - call count ‘pcalls’ - primitive call count ‘line’ - line number ‘name’ - function name ‘nfl’ - name/file/line ‘stdname’ - standard name ‘time’ - internal time ‘tottime’ - internal time
__module__ = 'pathos.profile'
__weakref__

list of weak references to the object (if defined)

class profiled(gen=None, prefix='id-', suffix='.prof')

Bases: object

decorator for profiling a function (does not call enable profiling)

y=gen(), with y an indentifier (e.g. current_process().pid)

Important class members:
prefix - string prefix for pstats filename [default: ‘id-‘] suffix - string suffix for pstats filename [default: ‘.prof’] pid - function for obtaining id of current process/thread sort - integer index of column in pstats output for sorting

Example

>>> import time
>>> import random
>>> import pathos.profile as pr
>>>
>>> config = dict(gen=pr.process_id)
>>> @pr.profiled(**config)
... def work(i):
...     x = random.random()
...     time.sleep(x)
...     return (i,x)
...
>>> pr.enable_profiling()
>>> from itertools import imap
>>> # profile the work (not the map internals); write to file for pstats
>>> for i in imap(work, range(-10,0)):
...     print(i)
...
NOTE: If gen is a bool or string, then sort=gen and pid is not used.
Otherwise, pid=gen and sort is not used. Output can be ordered by setting gen to one of the following: ‘calls’ - call count ‘cumulative’ - cumulative time ‘cumtime’ - cumulative time ‘file’ - file name ‘filename’ - file name ‘module’ - file name ‘ncalls’ - call count ‘pcalls’ - primitive call count ‘line’ - line number ‘name’ - function name ‘nfl’ - name/file/line ‘stdname’ - standard name ‘time’ - internal time ‘tottime’ - internal time
__call__(...) <==> x(...)
__dict__ = dict_proxy({'__module__': 'pathos.profile', '__dict__': <attribute '__dict__' of 'profiled' objects>, '__call__': <function __call__>, '__weakref__': <attribute '__weakref__' of 'profiled' objects>, '__doc__': 'decorator for profiling a function (does not call enable profiling)', '__init__': <function __init__>})
__init__(gen=None, prefix='id-', suffix='.prof')

y=gen(), with y an indentifier (e.g. current_process().pid)

Important class members:
prefix - string prefix for pstats filename [default: ‘id-‘] suffix - string suffix for pstats filename [default: ‘.prof’] pid - function for obtaining id of current process/thread sort - integer index of column in pstats output for sorting

Example

>>> import time
>>> import random
>>> import pathos.profile as pr
>>>
>>> config = dict(gen=pr.process_id)
>>> @pr.profiled(**config)
... def work(i):
...     x = random.random()
...     time.sleep(x)
...     return (i,x)
...
>>> pr.enable_profiling()
>>> from itertools import imap
>>> # profile the work (not the map internals); write to file for pstats
>>> for i in imap(work, range(-10,0)):
...     print(i)
...
NOTE: If gen is a bool or string, then sort=gen and pid is not used.
Otherwise, pid=gen and sort is not used. Output can be ordered by setting gen to one of the following: ‘calls’ - call count ‘cumulative’ - cumulative time ‘cumtime’ - cumulative time ‘file’ - file name ‘filename’ - file name ‘module’ - file name ‘ncalls’ - call count ‘pcalls’ - primitive call count ‘line’ - line number ‘name’ - function name ‘nfl’ - name/file/line ‘stdname’ - standard name ‘time’ - internal time ‘tottime’ - internal time
__module__ = 'pathos.profile'
__weakref__

list of weak references to the object (if defined)

start_profiling(*args)

begin profiling everything in the current thread/process

stop_profiling(*args)

stop profiling everything in the current thread/process

thread_id()

get the identifier for the current thread

python module

This module contains map and pipe interfaces to standard (i.e. serial) python.

Pipe methods provided:
pipe - blocking communication pipe [returns: value]
Map methods provided:
map - blocking and ordered worker pool [returns: list] imap - non-blocking and ordered worker pool [returns: iterator]

Usage

A typical call to a pathos python map will roughly follow this example:

>>> # instantiate and configure the worker pool
>>> from pathos.serial import SerialPool
>>> pool = SerialPool()
>>>
>>> # do a blocking map on the chosen function
>>> print(pool.map(pow, [1,2,3,4], [5,6,7,8]))
>>>
>>> # do a non-blocking map, then extract the results from the iterator
>>> results = pool.imap(pow, [1,2,3,4], [5,6,7,8])
>>> print("...")
>>> print(list(results))
>>>
>>> # do one item at a time, using a pipe
>>> print(pool.pipe(pow, 1, 5))
>>> print(pool.pipe(pow, 2, 6))

Notes

This worker pool leverages the built-in python maps, and thus does not have limitations due to serialization of the function f or the sequences in args. The maps in this worker pool have full functionality whether run from a script or in the python interpreter, and work reliably for both imported and interactively-defined functions.

PythonSerial

alias of pathos.serial.SerialPool

secure module

class Copier(name=None, **kwds)

Bases: pathos.connection.Pipe

a popen-based copier for parallel and distributed computing.

create a copier

Inputs:
name: a unique identifier (string) for the launcher source: hostname:path of original [user@host:path is also valid] destination: hostname:path for copy [user@host:path is also valid] launcher: remote service mechanism (i.e. scp, cp) [default = ‘scp’] options: remote service options (i.e. -v, -P) [default = ‘’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process
__call__(**kwds)

configure the copier using given keywords:

(Re)configure the copier for the following inputs:
source: hostname:path of original [user@host:path is also valid] destination: hostname:path for copy [user@host:path is also valid] launcher: remote service mechanism (i.e. scp, cp) [default = ‘scp’] options: remote service options (i.e. -v, -P) [default = ‘’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process
__init__(name=None, **kwds)

create a copier

Inputs:
name: a unique identifier (string) for the launcher source: hostname:path of original [user@host:path is also valid] destination: hostname:path for copy [user@host:path is also valid] launcher: remote service mechanism (i.e. scp, cp) [default = ‘scp’] options: remote service options (i.e. -v, -P) [default = ‘’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process
__module__ = 'pathos.secure.copier'
config(**kwds)

configure the copier using given keywords:

(Re)configure the copier for the following inputs:
source: hostname:path of original [user@host:path is also valid] destination: hostname:path for copy [user@host:path is also valid] launcher: remote service mechanism (i.e. scp, cp) [default = ‘scp’] options: remote service options (i.e. -v, -P) [default = ‘’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process
class Pipe(name=None, **kwds)

Bases: pathos.connection.Pipe

a popen-based ssh-pipe for parallel and distributed computing.

create a ssh pipe

Inputs:
name: a unique identifier (string) for the pipe host: hostname to recieve command [user@host is also valid] command: a command to send [default = ‘echo <name>’] launcher: remote service mechanism (i.e. ssh, rsh) [default = ‘ssh’] options: remote service options (i.e. -v, -N, -L) [default = ‘’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process
__call__(**kwds)

configure a remote command using given keywords:

(Re)configure the copier for the following inputs:
host: hostname to recieve command [user@host is also valid] command: a command to send [default = ‘echo <name>’] launcher: remote service mechanism (i.e. ssh, rsh) [default = ‘ssh’] options: remote service options (i.e. -v, -N, -L) [default = ‘’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process
__init__(name=None, **kwds)

create a ssh pipe

Inputs:
name: a unique identifier (string) for the pipe host: hostname to recieve command [user@host is also valid] command: a command to send [default = ‘echo <name>’] launcher: remote service mechanism (i.e. ssh, rsh) [default = ‘ssh’] options: remote service options (i.e. -v, -N, -L) [default = ‘’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process
__module__ = 'pathos.secure.connection'
config(**kwds)

configure a remote command using given keywords:

(Re)configure the copier for the following inputs:
host: hostname to recieve command [user@host is also valid] command: a command to send [default = ‘echo <name>’] launcher: remote service mechanism (i.e. ssh, rsh) [default = ‘ssh’] options: remote service options (i.e. -v, -N, -L) [default = ‘’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process
class Tunnel(name=None, **kwds)

Bases: object

a ssh-tunnel launcher for parallel and distributed computing.

create a ssh tunnel launcher

Inputs:
name – a unique identifier (string) for the launcher
MAXPORT = 65535
MINPORT = 1024
_Tunnel__disconnect()

disconnect tunnel internals

__dict__ = dict_proxy({'__module__': 'pathos.secure.tunnel', '_connect': <function _connect>, 'MAXPORT': 65535, 'verbose': True, '_Tunnel__disconnect': <function __disconnect>, 'connect': <function connect>, '__dict__': <attribute '__dict__' of 'Tunnel' objects>, 'disconnect': <function disconnect>, 'MINPORT': 1024, '__weakref__': <attribute '__weakref__' of 'Tunnel' objects>, '__doc__': 'a ssh-tunnel launcher for parallel and distributed computing.', '__init__': <function __init__>, '__repr__': <function __repr__>})
__init__(name=None, **kwds)

create a ssh tunnel launcher

Inputs:
name – a unique identifier (string) for the launcher
__module__ = 'pathos.secure.tunnel'
__repr__() <==> repr(x)
__weakref__

list of weak references to the object (if defined)

_connect(localport, remotehost, remoteport, through=None)
connect(host, port=None, through=None)

establish a secure shell tunnel between local and remote host

Input:
host – remote hostname [user@host:path is also valid] port – remote port number
Additional Input:
through – ‘tunnel-through’ hostname [default = None]
disconnect()

destroy the ssh tunnel

verbose = True
exception TunnelException

Bases: exceptions.Exception

Exception for failure to establish ssh tunnel

__module__ = 'pathos.secure.tunnel'
__weakref__

list of weak references to the object (if defined)

selector module

This module implements a selector class, which can be used to dispatch events and for event handler wrangling.

class Selector

Bases: object

Selector object for watching and event notification.

Takes no initial input.

_TIMEOUT = 0.5
__dict__ = dict_proxy({'__module__': 'pathos.selector', 'notifyOnReadReady': <function notifyOnReadReady>, '_watch': <function _watch>, '_debug': <logging.Logger object>, 'watch': <function watch>, 'notifyOnWriteReady': <function notifyOnWriteReady>, 'notifyOnInterrupt': <function notifyOnInterrupt>, '__dict__': <attribute '__dict__' of 'Selector' objects>, '_cleanup': <function _cleanup>, 'notifyOnException': <function notifyOnException>, '_TIMEOUT': 0.5, '_dispatch': <function _dispatch>, '__weakref__': <attribute '__weakref__' of 'Selector' objects>, '__doc__': '\nSelector object for watching and event notification.\n ', '__init__': <function __init__>, 'notifyWhenIdle': <function notifyWhenIdle>})
__init__()

Takes no initial input.

__module__ = 'pathos.selector'
__weakref__

list of weak references to the object (if defined)

_cleanup()
_debug = <logging.Logger object>
_dispatch(handlers, entities)
_watch()
notifyOnException(fd, handler)

add <handler> to the list of routines to call when <fd> raises an exception

notifyOnInterrupt(handler)

add <handler> to the list of routines to call when a signal arrives

notifyOnReadReady(fd, handler)

add <handler> to the list of routines to call when <fd> is read ready

notifyOnWriteReady(fd, handler)

add <handler> to the list of routines to call when <fd> is write ready

notifyWhenIdle(handler)

add <handler> to the list of routines to call when a timeout occurs

watch(timeout=None)

dispatch events to the registered hanlders

serial module

This module contains map and pipe interfaces to standard (i.e. serial) python.

Pipe methods provided:
pipe - blocking communication pipe [returns: value]
Map methods provided:
map - blocking and ordered worker pool [returns: list] imap - non-blocking and ordered worker pool [returns: iterator]

Usage

A typical call to a pathos python map will roughly follow this example:

>>> # instantiate and configure the worker pool
>>> from pathos.serial import SerialPool
>>> pool = SerialPool()
>>>
>>> # do a blocking map on the chosen function
>>> print(pool.map(pow, [1,2,3,4], [5,6,7,8]))
>>>
>>> # do a non-blocking map, then extract the results from the iterator
>>> results = pool.imap(pow, [1,2,3,4], [5,6,7,8])
>>> print("...")
>>> print(list(results))
>>>
>>> # do one item at a time, using a pipe
>>> print(pool.pipe(pow, 1, 5))
>>> print(pool.pipe(pow, 2, 6))

Notes

This worker pool leverages the built-in python maps, and thus does not have limitations due to serialization of the function f or the sequences in args. The maps in this worker pool have full functionality whether run from a script or in the python interpreter, and work reliably for both imported and interactively-defined functions.

class SerialPool(*args, **kwds)

Bases: pathos.abstract_launcher.AbstractWorkerPool

Mapper that leverages standard (i.e. serial) python maps.

Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
_SerialPool__get_nodes()

get the number of nodes in the pool

_SerialPool__set_nodes(nodes)

set the number of nodes in the pool

__module__ = 'pathos.serial'
_exiting = False
_is_alive(negate=False, run=True)
clear()

hard restart

close()

close the pool to any new jobs

imap(f, *args, **kwds)

run a batch of jobs with a non-blocking and ordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence.

join()

cleanup the closed worker processes

map(f, *args, **kwds)

run a batch of jobs with a blocking and ordered map

Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence.

nodes

get the number of nodes in the pool

pipe(f, *args, **kwds)

submit a job and block until results are available

Returns result of calling the function f on a selected worker. This function will block until results are available.

restart(force=False)

restart a closed pool

terminate()

a more abrupt close

server module

This module contains the base class for pathos servers, and describes the pathos server interface. If a third-party RPC server is selected, such as ‘parallel python’ (i.e. ‘pp’) or ‘RPyC’, direct calls to the third-party interface are currently used.

class Server

Bases: object

Server base class for pathos servers for parallel and distributed computing.

Takes no initial input.

__dict__ = dict_proxy({'__dict__': <attribute '__dict__' of 'Server' objects>, '__module__': 'pathos.server', 'activate': <function activate>, 'deactivate': <function deactivate>, '__weakref__': <attribute '__weakref__' of 'Server' objects>, 'serve': <function serve>, '__doc__': '\nServer base class for pathos servers for parallel and distributed computing.\n ', '__init__': <function __init__>, 'selector': <function selector>})
__init__()

Takes no initial input.

__module__ = 'pathos.server'
__weakref__

list of weak references to the object (if defined)

activate(onTimeout=None, selector=None)

configure the selector and install the timeout callback

deactivate()

turn off the selector

selector()

get the selector

serve(timeout)

begin serving, and set the timeout

threading module

This module contains map and pipe interfaces to python’s threading module.

Pipe methods provided:
pipe - blocking communication pipe [returns: value] apipe - asynchronous communication pipe [returns: object]
Map methods provided:
map - blocking and ordered worker pool [returns: list] imap - non-blocking and ordered worker pool [returns: iterator] uimap - non-blocking and unordered worker pool [returns: iterator] amap - asynchronous worker pool [returns: object]

Usage

A typical call to a pathos threading map will roughly follow this example:

>>> # instantiate and configure the worker pool
>>> from pathos.threading import ThreadPool
>>> pool = ThreadPool(nodes=4)
>>>
>>> # do a blocking map on the chosen function
>>> print(pool.map(pow, [1,2,3,4], [5,6,7,8]))
>>>
>>> # do a non-blocking map, then extract the results from the iterator
>>> results = pool.imap(pow, [1,2,3,4], [5,6,7,8])
>>> print("...")
>>> print(list(results))
>>>
>>> # do an asynchronous map, then get the results
>>> results = pool.amap(pow, [1,2,3,4], [5,6,7,8])
>>> while not results.ready():
...     time.sleep(5); print(".", end=' ')
...
>>> print(results.get())
>>>
>>> # do one item at a time, using a pipe
>>> print(pool.pipe(pow, 1, 5))
>>> print(pool.pipe(pow, 2, 6))
>>>
>>> # do one item at a time, using an asynchronous pipe
>>> result1 = pool.apipe(pow, 1, 5)
>>> result2 = pool.apipe(pow, 2, 6)
>>> print(result1.get())
>>> print(result2.get())

Notes

This worker pool leverages the python’s multiprocessing.dummy module, and thus has many of the limitations associated with that module. The function f and the sequences in args must be serializable. The maps in this worker pool have full functionality whether run from a script or in the python interpreter, and work reliably for both imported and interactively-defined functions. Unlike python’s multiprocessing.dummy module, pathos.threading maps can directly utilize functions that require multiple arguments.

class ThreadPool(*args, **kwds)

Bases: pathos.abstract_launcher.AbstractWorkerPool

Mapper that leverages python’s threading.

Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will autodetect processors

_ThreadPool__get_nodes()

get the number of nodes used in the map

_ThreadPool__set_nodes(nodes)

set the number of nodes used in the map

__init__(*args, **kwds)
Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler

NOTE: if number of nodes is not given, will autodetect processors

__module__ = 'pathos.threading'
__repr__() <==> repr(x)
_clear()

Remove server with matching state

_serve(nodes=None)

Create a new server if one isn’t already initialized

amap(f, *args, **kwds)

run a batch of jobs with an asynchronous map

Returns a results object which containts the results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until all results are retrieved. Use the ready() method on the result object to check if all results are ready.

apipe(f, *args, **kwds)

submit a job asynchronously to a queue

Returns a results object which containts the result of calling the function f on a selected worker. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until the result is available. Use the ready() method on the results object to check if the result is ready.

clear()

Remove server with matching state

close()

close the pool to any new jobs

imap(f, *args, **kwds)

run a batch of jobs with a non-blocking and ordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence.

join()

cleanup the closed worker processes

map(f, *args, **kwds)

run a batch of jobs with a blocking and ordered map

Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence.

nodes

get the number of nodes used in the map

nthreads

get the number of nodes used in the map

pipe(f, *args, **kwds)

submit a job and block until results are available

Returns result of calling the function f on a selected worker. This function will block until results are available.

restart(force=False)

restart a closed pool

terminate()

a more abrupt close

uimap(f, *args, **kwds)

run a batch of jobs with a non-blocking and unordered map

Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. The order of the resulting sequence is not guaranteed.

_ThreadPool(processes=None, initializer=None, initargs=())

util module

utilities for distributed computing

_b(string, codec=None)

convert string to bytes using the given codec (default is ‘ascii’)

_str(byte, codec=None)

convert bytes to string using the given codec (default is ‘ascii’)

print_exc_info()

thread-safe return of string from print_exception call

spawn(onParent, onChild)

a unidirectional fork wrapper

Calls onParent(pid, fromchild) in parent process,
onChild(pid, toparent) in child process.
spawn2(onParent, onChild)

a bidirectional fork wrapper

Calls onParent(pid, fromchild, tochild) in parent process,
onChild(pid, fromparent, toparent) in child process.

xmlrpc module

class XMLRPCRequestHandler(server, socket)

Bases: BaseHTTPServer.BaseHTTPRequestHandler

create a XML-RPC request handler

Override BaseHTTPRequestHandler.__init__(): we need to be able to have (potentially) multiple handler objects at a given time.

Inputs:
server – server object to handle requests for socket – socket connection
__init__(server, socket)

Override BaseHTTPRequestHandler.__init__(): we need to be able to have (potentially) multiple handler objects at a given time.

Inputs:
server – server object to handle requests for socket – socket connection
__module__ = 'pathos.xmlrpc.server'
_debug = <logging.Logger object>
_sendResponse(response)

Write the XML-RPC response

do_POST()

Access point from HTTP handler

log_message(format, *args)

Overriding BaseHTTPRequestHandler.log_message()

class XMLRPCServer(host, port)

Bases: pathos.server.Server, SimpleXMLRPCServer.SimpleXMLRPCDispatcher

extends base pathos server to an XML-RPC dispatcher

create a XML-RPC server

Takes two initial inputs:
host – hostname of XML-RPC server host port – port number for server requests
__init__(host, port)

create a XML-RPC server

Takes two initial inputs:
host – hostname of XML-RPC server host port – port number for server requests
__module__ = 'pathos.xmlrpc.server'
_handleMessageFromChild(selector, fd)

handler for message from a child process

_installSocket(host, port)

prepare a listening socket

_marshaled_dispatch(data, dispatch_method=None)

override SimpleXMLRPCDispatcher._marshaled_dispatch() fault string

_onConnection(selector, fd)

upon socket connection, establish a request handler

_onSelectorIdle(selector)

something to do when there’s no requests

_onSocketConnection(socket)

upon socket connections, establish a request handler

_registerChild(pid, fromchild)

register a child process so it can be retrieved on select events

_unRegisterChild(fd)

remove a child process from active process register

activate()

install callbacks

serve()

enter the select loop… and wait for service requests