pathos module documentation
abstract_launcher module
This module contains the base classes for pathos pool and pipe objects, and describes the map and pipe interfaces. A pipe is defined as a connection between two ‘nodes’, where a node is something that does work. A pipe may be a one-way or two-way connection. A map is defined as a one-to-many connection between nodes. In both map and pipe connections, results from the connected nodes can be returned to the calling node. There are several variants of pipe and map, such as whether the connection is blocking, or ordered, or asynchronous. For pipes, derived methods must overwrite the ‘pipe’ method, while maps must overwrite the ‘map’ method. Pipes and maps are available from worker pool objects, where the work is done by any of the workers in the pool. For more specific point-to-point connections (such as a pipe between two specific compute nodes), use the pipe object directly.
Usage
A typical call to a pathos map will roughly follow this example:
>>> # instantiate and configure the worker pool
>>> from pathos.pools import ProcessPool
>>> pool = ProcessPool(nodes=4)
>>>
>>> # do a blocking map on the chosen function
>>> results = pool.map(pow, [1,2,3,4], [5,6,7,8])
>>>
>>> # do a non-blocking map, then extract the results from the iterator
>>> results = pool.imap(pow, [1,2,3,4], [5,6,7,8])
>>> print("...")
>>> results = list(results)
>>>
>>> # do an asynchronous map, then get the results
>>> results = pool.amap(pow, [1,2,3,4], [5,6,7,8])
>>> while not results.ready():
... time.sleep(5); print(".", end=' ')
...
>>> results = results.get()
Notes
Each of the pathos worker pools rely on a different transport protocol (e.g. threads, multiprocessing, etc), where the use of each pool comes with a few caveats. See the usage documentation and examples for each worker pool for more information.
- class AbstractPipeConnection(*args, **kwds)
Bases:
object
AbstractPipeConnection base class for pathos pipes.
- Required input:
???
- Additional inputs:
???
- Important class members:
???
- Other class members:
???
- __repr__()
Return repr(self).
- class AbstractWorkerPool(*args, **kwds)
Bases:
object
AbstractWorkerPool base class for pathos pools.
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
- __enter__()
- __exit__(*args)
- __get_nodes()
get the number of nodes in the pool
- __imap(f, *args, **kwds)
default filter for imap inputs
- __init(*args, **kwds)
default filter for __init__ inputs
- __map(f, *args, **kwds)
default filter for map inputs
- __nodes = 1
- __pipe(f, *args, **kwds)
default filter for pipe inputs
- __repr__()
Return repr(self).
- __set_nodes(nodes)
set the number of nodes in the pool
- _serve(*args, **kwds)
Create a new server if one isn’t already initialized
- amap(f, *args, **kwds)
run a batch of jobs with an asynchronous map
Returns a results object which containts the results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until all results are retrieved. Use the ready() method on the result object to check if all results are ready. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- apipe(f, *args, **kwds)
submit a job asynchronously to a queue
Returns a results object which containts the result of calling the function f on a selected worker. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until the result is available. Use the ready() method on the results object to check if the result is ready.
- clear()
Remove server with matching state
- imap(f, *args, **kwds)
run a batch of jobs with a non-blocking and ordered map
Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- map(f, *args, **kwds)
run a batch of jobs with a blocking and ordered map
Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- pipe(f, *args, **kwds)
submit a job and block until results are available
Returns result of calling the function f on a selected worker. This function will block until results are available.
- uimap(f, *args, **kwds)
run a batch of jobs with a non-blocking and unordered map
Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. The order of the resulting sequence is not guaranteed. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
connection module
This module contains the base class for popen pipes, and describes the popen pipe interface. The ‘config’ method can be overwritten for pipe customization. The pipe’s ‘launch’ method can be overwritten with a derived pipe’s new execution algorithm. See the following for an example of standard use.
Usage
A typical call to a popen ‘pipe’ will roughly follow this example:
>>> # instantiate the pipe
>>> pipe = Pipe()
>>>
>>> # configure the pipe to stage the command
>>> pipe(command='hostname')
>>>
>>> # execute the launch and retrieve the response
>>> pipe.launch()
>>> print(pipe.response())
- class Pipe(name=None, **kwds)
Bases:
object
a popen-based pipe for parallel and distributed computing
create a popen-pipe
- Inputs:
name: a unique identifier (string) for the pipe command: a command to send [default = ‘echo <name>’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process
- __call__(**kwds)
configure the pipe using given keywords
- (Re)configure the pipe for the following inputs:
command: a command to send [default = ‘echo <name>’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process
- __repr__()
Return repr(self).
- _debug = <Logger pathos (WARNING)>
- _execute()
- config(**kwds)
configure the pipe using given keywords
- (Re)configure the pipe for the following inputs:
command: a command to send [default = ‘echo <name>’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process
- kill()
terminate the pipe
- launch()
launch a configured command
- pid()
get pipe pid
- response()
Return the response from the launched process. Return None if no response was received yet from a background process.
- verbose = True
core module
high-level programming interface to core pathos utilities
- connect(host, port=None, through=None)
establish a secure tunnel connection to a remote host at the given port
- Parameters:
established (host -- hostname to which a tunnel should be)
number (port -- port)
None] (through -- 'tunnel-through' hostname [default =)
- copy(source, destination=None, **kwds)
copy source to (possibly) remote destination
Execute a copy, and return the copier. Use ‘kill’ to kill the copier, and ‘pid’ to get the process id for the copier.
- Parameters:
'file' (source -- path string of source)
target (destination -- path string for destination)
- execute(command, host=None, bg=True, **kwds)
execute a command (possibly) on a remote host
Execute a process, and return the launcher. Use ‘response’ to retrieve the response from the executed command. Use ‘kill’ to kill the launcher, and ‘pid’ to get the process id for the launcher.
- Parameters:
executed (command -- command string to be)
None (host -- hostname of execution target [default =)
True] (bg -- run as background process? [default =)
- getchild(pid=None, host=None, group=False)
get all child process ids for the given parent process id (ppid)
If pid is None, the pid of the __main__ python instance will be used.
- Parameters:
id (group -- get process ids for the parent group)
running (host -- hostname where process is)
id
- getpid(target=None, host=None, all=False, **kwds)
get the process id for a target process (possibly) running on remote host
This method should only be used as a last-ditch effort to find a process id. This method __may__ work when a child has been spawned and the pid was not registered… but there’s no guarantee.
If target is None, then get the process id of the __main__ python instance.
- Parameters:
process (target -- string name of target)
running (host -- hostname where process is)
False] (all -- get all resulting lines from query? [default =)
- getppid(pid=None, host=None, group=False)
get parent process id (ppid) for the given process
If pid is None, the pid of the __main__ python instance will be used.
- Parameters:
id (group -- get parent group)
running (host -- hostname where process is)
id
- kill(pid, host=None, **kwds)
kill a process (possibly) on a remote host
- Parameters:
id (pid -- process)
None (host -- hostname where process is running [default =)
- randomport(host=None)
select a open port on a (possibly) remote host
- Parameters:
port (host -- hostname on which to select a open)
- serve(server, host=None, port=None, profile='.bash_profile')
begin serving RPC requests
- Parameters:
server – name of RPC server (i.e. ‘ppserver’)
host – hostname on which a server should be launched
port – port number (on host) that server will accept request at
profile – file to configure the user’s environment [default=’.bash_profile’]
helpers module
- ProcessPool
alias of
Pool
- ThreadPool(processes=None, initializer=None, initargs=())
- cpu_count()
Returns the number of CPUs in the system
- freeze_support()
Check whether this is a fake forked process in a frozen executable. If so then run code specified by commandline and exit.
- shutdown(type=None)
destroy all cached pools (of the given type)
hosts module
high-level programming interface to pathos host registry
- _profiles = {}
For example, to register two ‘known’ host profiles:
_profiles = { ‘foobar.danse.us’:’.profile’, ‘computer.cacr.caltech.edu’:’.cshrc’, }
- get_profile(rhost, assume=True)
get the default $PROFILE for a remote host
- get_profiles()
get $PROFILE for each registered host
- register(rhost, profile=None)
register a host and $PROFILE
- register_profiles(profiles)
add dict of {‘host’:$PROFILE} to registered host profiles
maps module
maps: stand-alone map-like objects using lazy pool instantiation
- class Amap(pool=None, *args, **kwds)
Bases:
Map
async map instance with internal lazy pool instantiation
- Parameters:
pool – pool object (i.e. pathos.pools.ProcessPool)
*args – positional arguments for pool initialization
**kwds – keyword arguments for pool initialization
close – if True, close the pool to any new jobs [Default: False]
join – if True, reclaim the pool’s closed workers [Default: False]
clear – if True, delete the pool singleton [Default: False]
- NOTE: if a pool object is not provided, NotImplemented is returned
upon use.
- NOTE: pools from both multiprocess and pathos.pools can be used,
however the behavior is slightly different. Pools from both pathos and multiprocess have close and join methods, to close the pool to new jobs, and to shut down the pool’s workers. Pools from pathos, however, are launched as singletons, so they also include a clear method that deletes the singleton. In either case, a pool that has been “closed” will throw a ValueError if map is then called, and similarly, a ValueError will be thrown if join is called before a pool is “closed”. The major difference is that if a pathos.pool is closed, the map instance cannot run new jobs until “clear” is called, while a new multiprocess pool will be created each time the map is executed. This leads to pathos.pools generally being called with either
clear=True
orclear=False
, and pools from multprocess either usingclose=True
orjoin=True
or both. Some hierarchical parallel workflows are not allowed, and will result in an error being thrown; however, changing close, join, or clear can often remove the error.
- __call__(func, *args, **kwds)
instantiate a pool and execute the pool’s async map
- Parameters:
func – function object to map
*args – positional arguments for async map
**kwds – keyword arguments for async map
- Returns:
results from execution of async
map(func, *args, **kwds)
NOTE: initializes a new worker pool with each call
- class Asmap(pool=None, *args, **kwds)
Bases:
Map
async starmap instance with internal lazy pool instantiation
- Parameters:
pool – pool object (i.e. pathos.pools.ProcessPool)
*args – positional arguments for pool initialization
**kwds – keyword arguments for pool initialization
close – if True, close the pool to any new jobs [Default: False]
join – if True, reclaim the pool’s closed workers [Default: False]
clear – if True, delete the pool singleton [Default: False]
- NOTE: if a pool object is not provided, NotImplemented is returned
upon use.
- NOTE: pools from both multiprocess and pathos.pools can be used,
however the behavior is slightly different. Pools from both pathos and multiprocess have close and join methods, to close the pool to new jobs, and to shut down the pool’s workers. Pools from pathos, however, are launched as singletons, so they also include a clear method that deletes the singleton. In either case, a pool that has been “closed” will throw a ValueError if map is then called, and similarly, a ValueError will be thrown if join is called before a pool is “closed”. The major difference is that if a pathos.pool is closed, the map instance cannot run new jobs until “clear” is called, while a new multiprocess pool will be created each time the map is executed. This leads to pathos.pools generally being called with either
clear=True
orclear=False
, and pools from multprocess either usingclose=True
orjoin=True
or both. Some hierarchical parallel workflows are not allowed, and will result in an error being thrown; however, changing close, join, or clear can often remove the error.
- __call__(func, *args, **kwds)
instantiate a pool and execute the pool’s async starmap
- Parameters:
func – function object to map
*args – positional arguments for async starmap
**kwds – keyword arguments for async starmap
- Returns:
results from execution of async
starmap(func, *args, **kwds)
NOTE: initializes a new worker pool with each call
- class Imap(pool=None, *args, **kwds)
Bases:
Map
map iterator with internal lazy pool instantiation
- Parameters:
pool – pool object (i.e. pathos.pools.ProcessPool)
*args – positional arguments for pool initialization
**kwds – keyword arguments for pool initialization
close – if True, close the pool to any new jobs [Default: False]
join – if True, reclaim the pool’s closed workers [Default: False]
clear – if True, delete the pool singleton [Default: False]
- NOTE: if a pool object is not provided, a builtins.map will be
used.
- NOTE: pools from both multiprocess and pathos.pools can be used,
however the behavior is slightly different. Pools from both pathos and multiprocess have close and join methods, to close the pool to new jobs, and to shut down the pool’s workers. Pools from pathos, however, are launched as singletons, so they also include a clear method that deletes the singleton. In either case, a pool that has been “closed” will throw a ValueError if map is then called, and similarly, a ValueError will be thrown if join is called before a pool is “closed”. The major difference is that if a pathos.pool is closed, the map instance cannot run new jobs until “clear” is called, while a new multiprocess pool will be created each time the map is executed. This leads to pathos.pools generally being called with either
clear=True
orclear=False
, and pools from multprocess either usingclose=True
orjoin=True
or both. Some hierarchical parallel workflows are not allowed, and will result in an error being thrown; however, changing close, join, or clear can often remove the error.
- __call__(func, *args, **kwds)
instantiate a pool and execute the pool’s map iterator
- Parameters:
func – function object to map
*args – positional arguments for map iterator
**kwds – keyword arguments for map iterator
- Returns:
results from execution of
map(func, *args, **kwds)
iterator
NOTE: initializes a new worker pool with each call
- class Ismap(pool=None, *args, **kwds)
Bases:
Map
starmap iterator with internal lazy pool instantiation
- Parameters:
pool – pool object (i.e. pathos.pools.ProcessPool)
*args – positional arguments for pool initialization
**kwds – keyword arguments for pool initialization
close – if True, close the pool to any new jobs [Default: False]
join – if True, reclaim the pool’s closed workers [Default: False]
clear – if True, delete the pool singleton [Default: False]
- NOTE: if a pool object is not provided, an itertools.starmap will
be used.
- NOTE: pools from both multiprocess and pathos.pools can be used,
however the behavior is slightly different. Pools from both pathos and multiprocess have close and join methods, to close the pool to new jobs, and to shut down the pool’s workers. Pools from pathos, however, are launched as singletons, so they also include a clear method that deletes the singleton. In either case, a pool that has been “closed” will throw a ValueError if map is then called, and similarly, a ValueError will be thrown if join is called before a pool is “closed”. The major difference is that if a pathos.pool is closed, the map instance cannot run new jobs until “clear” is called, while a new multiprocess pool will be created each time the map is executed. This leads to pathos.pools generally being called with either
clear=True
orclear=False
, and pools from multprocess either usingclose=True
orjoin=True
or both. Some hierarchical parallel workflows are not allowed, and will result in an error being thrown; however, changing close, join, or clear can often remove the error.
- __call__(func, *args, **kwds)
instantiate a pool and execute the pool’s starmap iterator
- Parameters:
func – function object to map
*args – positional arguments for starmap iterator
**kwds – keyword arguments for starmap iterator
- Returns:
results from execution of
starmap(func, *args, **kwds)
iterator
NOTE: initializes a new worker pool with each call
- class Map(pool=None, *args, **kwds)
Bases:
object
map instance with internal lazy pool instantiation
- Parameters:
pool – pool object (i.e. pathos.pools.ProcessPool)
*args – positional arguments for pool initialization
**kwds – keyword arguments for pool initialization
close – if True, close the pool to any new jobs [Default: False]
join – if True, reclaim the pool’s closed workers [Default: False]
clear – if True, delete the pool singleton [Default: False]
- NOTE: if a pool object is not provided, a builtins.map will be
used with the returned iterator cast to a list.
- NOTE: pools from both multiprocess and pathos.pools can be used,
however the behavior is slightly different. Pools from both pathos and multiprocess have close and join methods, to close the pool to new jobs, and to shut down the pool’s workers. Pools from pathos, however, are launched as singletons, so they also include a clear method that deletes the singleton. In either case, a pool that has been “closed” will throw a ValueError if map is then called, and similarly, a ValueError will be thrown if join is called before a pool is “closed”. The major difference is that if a pathos.pool is closed, the map instance cannot run new jobs until “clear” is called, while a new multiprocess pool will be created each time the map is executed. This leads to pathos.pools generally being called with either
clear=True
orclear=False
, and pools from multprocess either usingclose=True
orjoin=True
or both. Some hierarchical parallel workflows are not allowed, and will result in an error being thrown; however, changing close, join, or clear can often remove the error.
- __attr__()
- __call__(func, *args, **kwds)
instantiate a pool and execute the pool’s map
- Parameters:
func – function object to map
*args – positional arguments for map
**kwds – keyword arguments for map
- Returns:
results from execution of
map(func, *args, **kwds)
NOTE: initializes a new worker pool with each call
- __cls__()
- __del__()
shutdown the worker pool and tidy up
- property __func__
- property __get__
- __meth__()
- property __self__
- clear()
remove pool singleton, if exists
- close()
close the map to any new jobs
- join()
reclaim the closed workers
- class Smap(pool=None, *args, **kwds)
Bases:
Map
starmap instance with internal lazy pool instantiation
- Parameters:
pool – pool object (i.e. pathos.pools.ProcessPool)
*args – positional arguments for pool initialization
**kwds – keyword arguments for pool initialization
close – if True, close the pool to any new jobs [Default: False]
join – if True, reclaim the pool’s closed workers [Default: False]
clear – if True, delete the pool singleton [Default: False]
- NOTE: if a pool object is not provided, an itertools.starmap will
be used with the returned iterator cast to a list.
- NOTE: pools from both multiprocess and pathos.pools can be used,
however the behavior is slightly different. Pools from both pathos and multiprocess have close and join methods, to close the pool to new jobs, and to shut down the pool’s workers. Pools from pathos, however, are launched as singletons, so they also include a clear method that deletes the singleton. In either case, a pool that has been “closed” will throw a ValueError if map is then called, and similarly, a ValueError will be thrown if join is called before a pool is “closed”. The major difference is that if a pathos.pool is closed, the map instance cannot run new jobs until “clear” is called, while a new multiprocess pool will be created each time the map is executed. This leads to pathos.pools generally being called with either
clear=True
orclear=False
, and pools from multprocess either usingclose=True
orjoin=True
or both. Some hierarchical parallel workflows are not allowed, and will result in an error being thrown; however, changing close, join, or clear can often remove the error.
- __call__(func, *args, **kwds)
instantiate a pool and execute the pool’s starmap
- Parameters:
func – function object to map
*args – positional arguments for starmap
**kwds – keyword arguments for starmap
- Returns:
results from execution of
starmap(func, *args, **kwds)
NOTE: initializes a new worker pool with each call
- class Uimap(pool=None, *args, **kwds)
Bases:
Map
unordered map iterator with internal lazy pool instantiation
- Parameters:
pool – pool object (i.e. pathos.pools.ProcessPool)
*args – positional arguments for pool initialization
**kwds – keyword arguments for pool initialization
close – if True, close the pool to any new jobs [Default: False]
join – if True, reclaim the pool’s closed workers [Default: False]
clear – if True, delete the pool singleton [Default: False]
- NOTE: if a pool object is not provided, NotImplemented is returned
upon use.
- NOTE: pools from both multiprocess and pathos.pools can be used,
however the behavior is slightly different. Pools from both pathos and multiprocess have close and join methods, to close the pool to new jobs, and to shut down the pool’s workers. Pools from pathos, however, are launched as singletons, so they also include a clear method that deletes the singleton. In either case, a pool that has been “closed” will throw a ValueError if map is then called, and similarly, a ValueError will be thrown if join is called before a pool is “closed”. The major difference is that if a pathos.pool is closed, the map instance cannot run new jobs until “clear” is called, while a new multiprocess pool will be created each time the map is executed. This leads to pathos.pools generally being called with either
clear=True
orclear=False
, and pools from multprocess either usingclose=True
orjoin=True
or both. Some hierarchical parallel workflows are not allowed, and will result in an error being thrown; however, changing close, join, or clear can often remove the error.
- __call__(func, *args, **kwds)
instantiate a pool and execute the pool’s unordered map iterator
- Parameters:
func – function object to map
*args – positional arguments for unordered map iterator
**kwds – keyword arguments for unordered map iterator
- Returns:
results from execution of unordered
map(func, *args, **kwds)
iterator
NOTE: initializes a new worker pool with each call
mp_map module
minimal interface to python’s multiprocessing module
Notes
This module has been deprecated in favor of pathos.pools
.
- mp_map(function, sequence, *args, **kwds)
extend python’s parallel map function to multiprocessing
- Parameters:
function (function - target)
parallel (sequence - sequence to process in)
'autodetect'] (nproc - number of 'local' cpus to use [defaut =)
['blocking' (type - processing type)
'non-blocking'
'unordered']
True (threads - if)
multiprocessing (use threading instead of)
multiprocessing module
This module contains map and pipe interfaces to python’s multiprocessing module.
- Pipe methods provided:
pipe - blocking communication pipe [returns: value] apipe - asynchronous communication pipe [returns: object]
- Map methods provided:
map - blocking and ordered worker pool [returns: list] imap - non-blocking and ordered worker pool [returns: iterator] uimap - non-blocking and unordered worker pool [returns: iterator] amap - asynchronous worker pool [returns: object]
Usage
A typical call to a pathos multiprocessing map will roughly follow this example:
>>> # instantiate and configure the worker pool
>>> from pathos.multiprocessing import ProcessPool
>>> pool = ProcessPool(nodes=4)
>>>
>>> # do a blocking map on the chosen function
>>> print(pool.map(pow, [1,2,3,4], [5,6,7,8]))
>>>
>>> # do a non-blocking map, then extract the results from the iterator
>>> results = pool.imap(pow, [1,2,3,4], [5,6,7,8])
>>> print("...")
>>> print(list(results))
>>>
>>> # do an asynchronous map, then get the results
>>> results = pool.amap(pow, [1,2,3,4], [5,6,7,8])
>>> while not results.ready():
... time.sleep(5); print(".", end=' ')
...
>>> print(results.get())
>>>
>>> # do one item at a time, using a pipe
>>> print(pool.pipe(pow, 1, 5))
>>> print(pool.pipe(pow, 2, 6))
>>>
>>> # do one item at a time, using an asynchronous pipe
>>> result1 = pool.apipe(pow, 1, 5)
>>> result2 = pool.apipe(pow, 2, 6)
>>> print(result1.get())
>>> print(result2.get())
Notes
This worker pool leverages the python’s multiprocessing module, and thus has many of the limitations associated with that module. The function f and the sequences in args must be serializable. The maps in this worker pool have full functionality whether run from a script or in the python interpreter, and work reliably for both imported and interactively-defined functions. Unlike python’s multiprocessing module, pathos.multiprocessing maps can directly utilize functions that require multiple arguments.
- class ProcessPool(*args, **kwds)
Bases:
AbstractWorkerPool
Mapper that leverages python’s multiprocessing.
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will autodetect processors.
- NOTE: additional keyword input is optional, with:
id - identifier for the pool initializer - function that takes no input, called when node is spawned initargs - tuple of args for initializers that have args maxtasksperchild - int that limits the max number of tasks per node
- __get_nodes()
get the number of nodes used in the map
- __repr__()
Return repr(self).
- __set_nodes(nodes)
set the number of nodes used in the map
- __state__ = {2: <multiprocess.pool.Pool state=RUN pool_size=2>}
- _clear()
Remove server with matching state
- _serve(nodes=None)
Create a new server if one isn’t already initialized
- amap(f, *args, **kwds)
run a batch of jobs with an asynchronous map
Returns a results object which containts the results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until all results are retrieved. Use the ready() method on the result object to check if all results are ready. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- apipe(f, *args, **kwds)
submit a job asynchronously to a queue
Returns a results object which containts the result of calling the function f on a selected worker. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until the result is available. Use the ready() method on the results object to check if the result is ready.
- clear()
Remove server with matching state
- close()
close the pool to any new jobs
- imap(f, *args, **kwds)
run a batch of jobs with a non-blocking and ordered map
Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- join()
cleanup the closed worker processes
- map(f, *args, **kwds)
run a batch of jobs with a blocking and ordered map
Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- property ncpus
get the number of nodes used in the map
- property nodes
get the number of nodes used in the map
- pipe(f, *args, **kwds)
submit a job and block until results are available
Returns result of calling the function f on a selected worker. This function will block until results are available.
- restart(force=False)
restart a closed pool
- terminate()
a more abrupt close
- uimap(f, *args, **kwds)
run a batch of jobs with a non-blocking and unordered map
Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. The order of the resulting sequence is not guaranteed. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- _ProcessPool
alias of
Pool
parallel module
This module contains map and pipe interfaces to the parallelpython (pp) module.
- Pipe methods provided:
pipe - blocking communication pipe [returns: value] apipe - asynchronous communication pipe [returns: object]
- Map methods provided:
map - blocking and ordered worker pool [returns: list] imap - non-blocking and ordered worker pool [returns: iterator] uimap - non-blocking and unordered worker pool [returns: iterator] amap - asynchronous worker pool [returns: object]
Usage
A typical call to a pathos pp map will roughly follow this example:
>>> # instantiate and configure the worker pool
>>> from pathos.pp import ParallelPool
>>> pool = ParallelPool(nodes=4)
>>>
>>> # do a blocking map on the chosen function
>>> print(pool.map(pow, [1,2,3,4], [5,6,7,8]))
>>>
>>> # do a non-blocking map, then extract the results from the iterator
>>> results = pool.imap(pow, [1,2,3,4], [5,6,7,8])
>>> print("...")
>>> print(list(results))
>>>
>>> # do an asynchronous map, then get the results
>>> results = pool.amap(pow, [1,2,3,4], [5,6,7,8])
>>> while not results.ready():
... time.sleep(5); print(".", end=' ')
...
>>> print(results.get())
>>>
>>> # do one item at a time, using a pipe
>>> print(pool.pipe(pow, 1, 5))
>>> print(pool.pipe(pow, 2, 6))
>>>
>>> # do one item at a time, using an asynchronous pipe
>>> result1 = pool.apipe(pow, 1, 5)
>>> result2 = pool.apipe(pow, 2, 6)
>>> print(result1.get())
>>> print(result2.get())
Notes
This worker pool leverages the parallelpython (pp) module, and thus has many of the limitations associated with that module. The function f and the sequences in args must be serializable. The maps in this worker pool have full functionality when run from a script, but may be somewhat limited when used in the python interpreter. Both imported and interactively-defined functions in the interpreter session may fail due to the pool failing to find the source code for the target function. For a work-around, try:
>>> # instantiate and configure the worker pool
>>> from pathos.pp import ParallelPool
>>> pool = ParallelPool(nodes=4)
>>>
>>> # wrap the function, so it can be used interactively by the pool
>>> def wrapsin(*args, **kwds):
>>> from math import sin
>>> return sin(*args, **kwds)
>>>
>>> # do a blocking map using the wrapped function
>>> results = pool.map(wrapsin, [1,2,3,4,5])
- class ParallelPool(*args, **kwds)
Bases:
AbstractWorkerPool
Mapper that leverages parallelpython (i.e. pp) maps.
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will autodetect processors.
NOTE: if a tuple of servers is not provided, defaults to localhost only.
- NOTE: additional keyword input is optional, with:
id - identifier for the pool servers - tuple of pp.Servers
- __get_nodes()
get the number of nodes used in the map
- __get_servers()
get the servers used in the map
- __repr__()
Return repr(self).
- __set_nodes(nodes)
set the number of nodes used in the map
- __set_servers(servers)
set the servers used in the map
- __state__ = {}
- _clear()
Remove server with matching state
- _equals(server)
check if the server is compatible
- _is_alive(server=None, negate=False, run=True)
- _serve(nodes=None, servers=None)
Create a new server if one isn’t already initialized
- amap(f, *args, **kwds)
run a batch of jobs with an asynchronous map
Returns a results object which containts the results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until all results are retrieved. Use the ready() method on the result object to check if all results are ready. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- apipe(f, *args, **kwds)
submit a job asynchronously to a queue
Returns a results object which containts the result of calling the function f on a selected worker. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until the result is available. Use the ready() method on the results object to check if the result is ready.
- clear()
Remove server with matching state
- close()
close the pool to any new jobs
- imap(f, *args, **kwds)
run a batch of jobs with a non-blocking and ordered map
Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- join()
cleanup the closed worker processes
- map(f, *args, **kwds)
run a batch of jobs with a blocking and ordered map
Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- property ncpus
get the number of nodes used in the map
- property nodes
get the number of nodes used in the map
- pipe(f, *args, **kwds)
submit a job and block until results are available
Returns result of calling the function f on a selected worker. This function will block until results are available.
- restart(force=False)
restart a closed pool
- property servers
get the servers used in the map
- terminate()
a more abrupt close
- uimap(f, *args, **kwds)
run a batch of jobs with a non-blocking and unordered map
Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. The order of the resulting sequence is not guaranteed. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- stats(pool=None)
return a string containing stats response from the pp.Server
pools module
pools: pools of pathos workers, providing map and pipe constructs
- class ParallelPool(*args, **kwds)
Bases:
AbstractWorkerPool
Mapper that leverages parallelpython (i.e. pp) maps.
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will autodetect processors.
NOTE: if a tuple of servers is not provided, defaults to localhost only.
- NOTE: additional keyword input is optional, with:
id - identifier for the pool servers - tuple of pp.Servers
- __get_nodes()
get the number of nodes used in the map
- __get_servers()
get the servers used in the map
- __repr__()
Return repr(self).
- __set_nodes(nodes)
set the number of nodes used in the map
- __set_servers(servers)
set the servers used in the map
- __state__ = {}
- _clear()
Remove server with matching state
- _equals(server)
check if the server is compatible
- _is_alive(server=None, negate=False, run=True)
- _serve(nodes=None, servers=None)
Create a new server if one isn’t already initialized
- amap(f, *args, **kwds)
run a batch of jobs with an asynchronous map
Returns a results object which containts the results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until all results are retrieved. Use the ready() method on the result object to check if all results are ready. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- apipe(f, *args, **kwds)
submit a job asynchronously to a queue
Returns a results object which containts the result of calling the function f on a selected worker. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until the result is available. Use the ready() method on the results object to check if the result is ready.
- clear()
Remove server with matching state
- close()
close the pool to any new jobs
- imap(f, *args, **kwds)
run a batch of jobs with a non-blocking and ordered map
Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- join()
cleanup the closed worker processes
- map(f, *args, **kwds)
run a batch of jobs with a blocking and ordered map
Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- property ncpus
get the number of nodes used in the map
- property nodes
get the number of nodes used in the map
- pipe(f, *args, **kwds)
submit a job and block until results are available
Returns result of calling the function f on a selected worker. This function will block until results are available.
- restart(force=False)
restart a closed pool
- property servers
get the servers used in the map
- terminate()
a more abrupt close
- uimap(f, *args, **kwds)
run a batch of jobs with a non-blocking and unordered map
Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. The order of the resulting sequence is not guaranteed. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- class ProcessPool(*args, **kwds)
Bases:
AbstractWorkerPool
Mapper that leverages python’s multiprocessing.
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will autodetect processors.
- NOTE: additional keyword input is optional, with:
id - identifier for the pool initializer - function that takes no input, called when node is spawned initargs - tuple of args for initializers that have args maxtasksperchild - int that limits the max number of tasks per node
- __get_nodes()
get the number of nodes used in the map
- __repr__()
Return repr(self).
- __set_nodes(nodes)
set the number of nodes used in the map
- __state__ = {2: <multiprocess.pool.Pool state=RUN pool_size=2>}
- _clear()
Remove server with matching state
- _serve(nodes=None)
Create a new server if one isn’t already initialized
- amap(f, *args, **kwds)
run a batch of jobs with an asynchronous map
Returns a results object which containts the results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until all results are retrieved. Use the ready() method on the result object to check if all results are ready. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- apipe(f, *args, **kwds)
submit a job asynchronously to a queue
Returns a results object which containts the result of calling the function f on a selected worker. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until the result is available. Use the ready() method on the results object to check if the result is ready.
- clear()
Remove server with matching state
- close()
close the pool to any new jobs
- imap(f, *args, **kwds)
run a batch of jobs with a non-blocking and ordered map
Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- join()
cleanup the closed worker processes
- map(f, *args, **kwds)
run a batch of jobs with a blocking and ordered map
Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- property ncpus
get the number of nodes used in the map
- property nodes
get the number of nodes used in the map
- pipe(f, *args, **kwds)
submit a job and block until results are available
Returns result of calling the function f on a selected worker. This function will block until results are available.
- restart(force=False)
restart a closed pool
- terminate()
a more abrupt close
- uimap(f, *args, **kwds)
run a batch of jobs with a non-blocking and unordered map
Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. The order of the resulting sequence is not guaranteed. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- class SerialPool(*args, **kwds)
Bases:
AbstractWorkerPool
Mapper that leverages standard (i.e. serial) python maps.
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
- __get_nodes()
get the number of nodes in the pool
- __set_nodes(nodes)
set the number of nodes in the pool
- __state__ = {}
- _exiting = False
- _is_alive(negate=False, run=True)
- clear()
hard restart
- close()
close the pool to any new jobs
- imap(f, *args, **kwds)
run a batch of jobs with a non-blocking and ordered map
Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- join()
cleanup the closed worker processes
- map(f, *args, **kwds)
run a batch of jobs with a blocking and ordered map
Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- property nodes
get the number of nodes in the pool
- pipe(f, *args, **kwds)
submit a job and block until results are available
Returns result of calling the function f on a selected worker. This function will block until results are available.
- restart(force=False)
restart a closed pool
- terminate()
a more abrupt close
- class ThreadPool(*args, **kwds)
Bases:
AbstractWorkerPool
Mapper that leverages python’s threading.
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will autodetect processors.
- NOTE: additional keyword input is optional, with:
id - identifier for the pool initializer - function that takes no input, called when node is spawned initargs - tuple of args for initializers that have args
- __get_nodes()
get the number of nodes used in the map
- __repr__()
Return repr(self).
- __set_nodes(nodes)
set the number of nodes used in the map
- __state__ = {2: <multiprocess.pool.ThreadPool state=RUN pool_size=2>}
- _clear()
Remove server with matching state
- _serve(nodes=None)
Create a new server if one isn’t already initialized
- amap(f, *args, **kwds)
run a batch of jobs with an asynchronous map
Returns a results object which containts the results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until all results are retrieved. Use the ready() method on the result object to check if all results are ready. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- apipe(f, *args, **kwds)
submit a job asynchronously to a queue
Returns a results object which containts the result of calling the function f on a selected worker. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until the result is available. Use the ready() method on the results object to check if the result is ready.
- clear()
Remove server with matching state
- close()
close the pool to any new jobs
- imap(f, *args, **kwds)
run a batch of jobs with a non-blocking and ordered map
Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- join()
cleanup the closed worker processes
- map(f, *args, **kwds)
run a batch of jobs with a blocking and ordered map
Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- property nodes
get the number of nodes used in the map
- property nthreads
get the number of nodes used in the map
- pipe(f, *args, **kwds)
submit a job and block until results are available
Returns result of calling the function f on a selected worker. This function will block until results are available.
- restart(force=False)
restart a closed pool
- terminate()
a more abrupt close
- uimap(f, *args, **kwds)
run a batch of jobs with a non-blocking and unordered map
Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. The order of the resulting sequence is not guaranteed. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- _ProcessPool
alias of
Pool
- _ThreadPool(processes=None, initializer=None, initargs=())
- _clear(type=None)
destroy all cached pools (of the given type)
portpicker module
This script prints out an available port number.
- class portnumber(min=0, max=65536)
Bases:
object
port selector
- Usage:
>>> pick = portnumber(min=1024,max=65535) >>> print( pick() )
select a port number from a given range.
The first call will return a random number from the available range, and each subsequent call will return the next number in the range.
- Parameters:
0] (min -- minimum port number [default =)
65536] (max -- maximum port number [default =)
- __call__()
Call self as a function.
- randomport(min=1024, max=65536)
select a random port number
- Parameters:
1024] (min -- minimum port number [default =)
65536] (max -- maximum port number [default =)
pp module
This module contains map and pipe interfaces to the parallelpython (pp) module.
- Pipe methods provided:
pipe - blocking communication pipe [returns: value] apipe - asynchronous communication pipe [returns: object]
- Map methods provided:
map - blocking and ordered worker pool [returns: list] imap - non-blocking and ordered worker pool [returns: iterator] uimap - non-blocking and unordered worker pool [returns: iterator] amap - asynchronous worker pool [returns: object]
Usage
A typical call to a pathos pp map will roughly follow this example:
>>> # instantiate and configure the worker pool
>>> from pathos.pp import ParallelPool
>>> pool = ParallelPool(nodes=4)
>>>
>>> # do a blocking map on the chosen function
>>> print(pool.map(pow, [1,2,3,4], [5,6,7,8]))
>>>
>>> # do a non-blocking map, then extract the results from the iterator
>>> results = pool.imap(pow, [1,2,3,4], [5,6,7,8])
>>> print("...")
>>> print(list(results))
>>>
>>> # do an asynchronous map, then get the results
>>> results = pool.amap(pow, [1,2,3,4], [5,6,7,8])
>>> while not results.ready():
... time.sleep(5); print(".", end=' ')
...
>>> print(results.get())
>>>
>>> # do one item at a time, using a pipe
>>> print(pool.pipe(pow, 1, 5))
>>> print(pool.pipe(pow, 2, 6))
>>>
>>> # do one item at a time, using an asynchronous pipe
>>> result1 = pool.apipe(pow, 1, 5)
>>> result2 = pool.apipe(pow, 2, 6)
>>> print(result1.get())
>>> print(result2.get())
Notes
This worker pool leverages the parallelpython (pp) module, and thus has many of the limitations associated with that module. The function f and the sequences in args must be serializable. The maps in this worker pool have full functionality when run from a script, but may be somewhat limited when used in the python interpreter. Both imported and interactively-defined functions in the interpreter session may fail due to the pool failing to find the source code for the target function. For a work-around, try:
>>> # instantiate and configure the worker pool
>>> from pathos.pp import ParallelPool
>>> pool = ParallelPool(nodes=4)
>>>
>>> # wrap the function, so it can be used interactively by the pool
>>> def wrapsin(*args, **kwds):
>>> from math import sin
>>> return sin(*args, **kwds)
>>>
>>> # do a blocking map using the wrapped function
>>> results = pool.map(wrapsin, [1,2,3,4,5])
- ParallelPythonPool
alias of
ParallelPool
- stats(pool=None)
return a string containing stats response from the pp.Server
pp_map module
minimal interface to python’s pp
(parallel python) module
Implements a work-alike of the builtin map
function that distributes
work across many processes. As it uses ppft
to do the
actual parallel processing, code using this must conform to the usual
ppft
restrictions (arguments must be serializable, etc).
Notes
This module has been deprecated in favor of pathos.pools
.
- pp_map(function, sequence, *args, **kwds)
extend python’s parallel map function to parallel python
- Parameters:
function (function - target)
parallel (sequence - sequence to process in)
'autodetect'] (ncpus - number of 'local' processors to use [defaut =)
()] (servers - available distributed parallel python servers [default =)
- ppmap(processes, function, sequence, *sequences)
Split the work of ‘function’ across the given number of processes. Set ‘processes’ to None to let Parallel Python autodetect the number of children to use.
Although the calling semantics should be identical to __builtin__.map (even using __builtin__.map to process arguments), it differs in that it returns a generator instead of a list. This enables lazy evaluation of the results so that other work can be done while the subprocesses are still running.
>>> def rangetotal(n): return n, sum(range(n)) >>> list(map(rangetotal, range(1, 6))) [(1, 0), (2, 1), (3, 3), (4, 6), (5, 10)] >>> list(ppmap(1, rangetotal, range(1, 6))) [(1, 0), (2, 1), (3, 3), (4, 6), (5, 10)]
- print_stats(servers=None)
print stats from the pp.Server
- stats(pool=None)
return a string containing stats response from the pp.Server
profile module
This module contains functions for profiling in other threads and processes.
- Functions for identifying a thread/process:
process_id - get the identifier (process id) for the current process thread_id - get the identifier for the current thread
- Functions for controlling profiling:
enable_profiling - initialize a profiler in the current thread/process start_profiling - begin profiling everything in the current thread/process stop_profiling - stop profiling everything in the current thread/process disable_profiling - remove the profiler from the current thread/process
- Functions that control profile statstics (pstats) output
clear_stats - clear stored pstats from the current thread/process get_stats - get stored pstats for the current thread/process print_stats - print stored pstats for the current thread/process dump_stats - dump stored pstats for the current thread/process
- Functions that add/remove profiling:
profiled - decorator to add profiling to a function not_profiled - decorator to remove profiling from a function profile - decorator for profiling a function (will enable_profiling)
Usage
Typical calls to pathos profiling will roughly follow this example:
>>> import time
>>> import random
>>> import pathos.profile as pr
>>>
>>> # build a worker function
>>> def _work(i):
... x = random.random()
... time.sleep(x)
... return (i,x)
>>>
>>> # generate a 'profiled' work function
>>> config = dict(gen=pr.process_id)
>>> work = pr.profiled(**config)(_work)
>>>
>>> # enable profiling
>>> pr.enable_profiling()
>>>
>>> # profile the work (not the map internals) in the main process
>>> for i in map(work, range(-10,0)):
... print(i)
...
>>> # profile the map in the main process, and work in the other process
>>> from pathos.helpers import mp
>>> pool = mp.Pool(10)
>>> _uimap = pr.profiled(**config)(pool.imap_unordered)
>>> for i in _uimap(work, range(-10,0)):
... print(i)
...
>>> # deactivate all profiling
>>> pr.disable_profiling() # in the main process
>>> tuple(_uimap(pr.disable_profiling, range(10))) # in the workers
>>> for i in _uimap(work, range(-20,-10)):
... print(i)
...
>>> # re-activate profiling
>>> pr.enable_profiling()
>>>
>>> # print stats for profile of 'import math' in another process
>>> def test_import(module):
... __import__(module)
...
>>> import pathos.pools as pp
>>> pool = pp.ProcessPool(1)
>>> pr.profile('cumulative', pipe=pool.pipe)(test_import, 'pox')
10 function calls in 0.003 seconds
Ordered by: cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 0.003 0.003 <stdin>:1(test_import)
1 0.002 0.002 0.003 0.003 {__import__}
1 0.001 0.001 0.001 0.001 __init__.py:8(<module>)
1 0.000 0.000 0.000 0.000 shutils.py:11(<module>)
1 0.000 0.000 0.000 0.000 _disk.py:15(<module>)
1 0.000 0.000 0.000 0.000 {eval}
1 0.000 0.000 0.000 0.000 utils.py:11(<module>)
1 0.000 0.000 0.000 0.000 <string>:1(<module>)
1 0.000 0.000 0.000 0.000 info.py:2(<module>)
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
>>> pool.close()
>>> pool.join()
>>> pool.clear()
Notes
This module leverages the python’s cProfile module, and is primarily a high-level interface to that module that strives to make profiling in a different thread or process easier. The use of pathos.pools are suggested, however are not required (as seen in the example above).
In many cases, profiling in another thread is not necessary, and either of the following can be sufficient/better for timing and profiling:
$ python -c "import time; s=time.time(); import pathos; print (time.time()-s)"
$ python -c "import cProfile; p=cProfile.Profile(); p.enable(); import pathos; p.print_stats('cumulative')"
This module was inspired by: http://stackoverflow.com/a/32522579/4646678.
- clear_stats(*args)
clear all stored profiling results from the current thread/process
- disable_profiling(*args)
remove the profiler instance from the current thread/process
- dump_stats(*args, **kwds)
dump all stored profiling results for the current thread/process
Notes
see
pathos.profile.profiled
for settings for*args
and**kwds
- enable_profiling(*args)
initialize a profiler instance in the current thread/process
- get_stats(*args)
get all stored profiling results for the current thread/process
- not_profiled(f)
decorator to remove profiling (due to ‘profiled’) from a function
- print_stats(*args, **kwds)
print all stored profiling results for the current thread/process
- process_id()
get the identifier (process id) for the current process
- class profile(sort=None, **config)
Bases:
object
decorator for profiling a function (will enable profiling)
sort is integer index of column in pstats output for sorting
- Important class members:
pipe - pipe instance in which profiling is active
Example
>>> import time >>> import random >>> import pathos.profile as pr >>> ... def work(): ... x = random.random() ... time.sleep(x) ... return x ... >>> # profile the work; print pstats info >>> pr.profile()(work) 4 function calls in 0.136 seconds
Ordered by: standard name
- ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 0.136 0.136 <stdin>:1(work) 1 0.000 0.000 0.000 0.000 {method ‘disable’ of ‘_lsprof.Profiler’ objects} 1 0.000 0.000 0.000 0.000 {method ‘random’ of ‘_random.Random’ objects} 1 0.136 0.136 0.136 0.136 {time.sleep}
0.1350568110491419 >>>
- NOTE: pipe provided should come from pool built with nodes=1. Other
configuration keywords (config) are passed to ‘pr.profiled’. Output can be ordered by setting sort to one of the following: ‘calls’ - call count ‘cumulative’ - cumulative time ‘cumtime’ - cumulative time ‘file’ - file name ‘filename’ - file name ‘module’ - file name ‘ncalls’ - call count ‘pcalls’ - primitive call count ‘line’ - line number ‘name’ - function name ‘nfl’ - name/file/line ‘stdname’ - standard name ‘time’ - internal time ‘tottime’ - internal time
- __call__(function, *args, **kwds)
Call self as a function.
- class profiled(gen=None, prefix='id-', suffix='.prof')
Bases:
object
decorator for profiling a function (does not call enable profiling)
y=gen(), with y an indentifier (e.g. current_process().pid)
- Important class members:
prefix - string prefix for pstats filename [default: ‘id-‘] suffix - string suffix for pstats filename [default: ‘.prof’] pid - function for obtaining id of current process/thread sort - integer index of column in pstats output for sorting
Example
>>> import time >>> import random >>> import pathos.profile as pr >>> >>> config = dict(gen=pr.process_id) >>> @pr.profiled(**config) ... def work(i): ... x = random.random() ... time.sleep(x) ... return (i,x) ... >>> pr.enable_profiling() >>> # profile the work (not the map internals); write to file for pstats >>> for i in map(work, range(-10,0)): ... print(i) ...
- NOTE: If gen is a bool or string, then sort=gen and pid is not used.
Otherwise, pid=gen and sort is not used. Output can be ordered by setting gen to one of the following: ‘calls’ - call count ‘cumulative’ - cumulative time ‘cumtime’ - cumulative time ‘file’ - file name ‘filename’ - file name ‘module’ - file name ‘ncalls’ - call count ‘pcalls’ - primitive call count ‘line’ - line number ‘name’ - function name ‘nfl’ - name/file/line ‘stdname’ - standard name ‘time’ - internal time ‘tottime’ - internal time
- __call__(f)
Call self as a function.
- start_profiling(*args)
begin profiling everything in the current thread/process
- stop_profiling(*args)
stop profiling everything in the current thread/process
- thread_id()
get the identifier for the current thread
python module
This module contains map and pipe interfaces to standard (i.e. serial) python.
- Pipe methods provided:
pipe - blocking communication pipe [returns: value]
- Map methods provided:
map - blocking and ordered worker pool [returns: list] imap - non-blocking and ordered worker pool [returns: iterator]
Usage
A typical call to a pathos python map will roughly follow this example:
>>> # instantiate and configure the worker pool
>>> from pathos.serial import SerialPool
>>> pool = SerialPool()
>>>
>>> # do a blocking map on the chosen function
>>> print(pool.map(pow, [1,2,3,4], [5,6,7,8]))
>>>
>>> # do a non-blocking map, then extract the results from the iterator
>>> results = pool.imap(pow, [1,2,3,4], [5,6,7,8])
>>> print("...")
>>> print(list(results))
>>>
>>> # do one item at a time, using a pipe
>>> print(pool.pipe(pow, 1, 5))
>>> print(pool.pipe(pow, 2, 6))
Notes
This worker pool leverages the built-in python maps, and thus does not have limitations due to serialization of the function f or the sequences in args. The maps in this worker pool have full functionality whether run from a script or in the python interpreter, and work reliably for both imported and interactively-defined functions.
- PythonSerial
alias of
SerialPool
secure module
- class Copier(name=None, **kwds)
Bases:
Pipe
a popen-based copier for parallel and distributed computing.
create a copier
- Inputs:
name: a unique identifier (string) for the launcher source: hostname:path of original [user@host:path is also valid] destination: hostname:path for copy [user@host:path is also valid] launcher: remote service mechanism (i.e. scp, cp) [default = ‘scp’] options: remote service options (i.e. -v, -P) [default = ‘’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process
- __call__(**kwds)
configure the copier using given keywords:
- (Re)configure the copier for the following inputs:
source: hostname:path of original [user@host:path is also valid] destination: hostname:path for copy [user@host:path is also valid] launcher: remote service mechanism (i.e. scp, cp) [default = ‘scp’] options: remote service options (i.e. -v, -P) [default = ‘’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process
- config(**kwds)
configure the copier using given keywords:
- (Re)configure the copier for the following inputs:
source: hostname:path of original [user@host:path is also valid] destination: hostname:path for copy [user@host:path is also valid] launcher: remote service mechanism (i.e. scp, cp) [default = ‘scp’] options: remote service options (i.e. -v, -P) [default = ‘’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process
- class Pipe(name=None, **kwds)
Bases:
Pipe
a popen-based ssh-pipe for parallel and distributed computing.
create a ssh pipe
- Inputs:
name: a unique identifier (string) for the pipe host: hostname to recieve command [user@host is also valid] command: a command to send [default = ‘echo <name>’] launcher: remote service mechanism (i.e. ssh, rsh) [default = ‘ssh’] options: remote service options (i.e. -v, -N, -L) [default = ‘’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process
- __call__(**kwds)
configure a remote command using given keywords:
- (Re)configure the copier for the following inputs:
host: hostname to recieve command [user@host is also valid] command: a command to send [default = ‘echo <name>’] launcher: remote service mechanism (i.e. ssh, rsh) [default = ‘ssh’] options: remote service options (i.e. -v, -N, -L) [default = ‘’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process
- config(**kwds)
configure a remote command using given keywords:
- (Re)configure the copier for the following inputs:
host: hostname to recieve command [user@host is also valid] command: a command to send [default = ‘echo <name>’] launcher: remote service mechanism (i.e. ssh, rsh) [default = ‘ssh’] options: remote service options (i.e. -v, -N, -L) [default = ‘’] background: run in background [default = False] decode: ensure response is ‘ascii’ [default = True] stdin: file-like object to serve as standard input for the remote process
- class Tunnel(name=None, **kwds)
Bases:
object
a ssh-tunnel launcher for parallel and distributed computing.
create a ssh tunnel launcher
- Inputs:
name – a unique identifier (string) for the launcher
- MAXPORT = 65535
- MINPORT = 1024
- __disconnect()
disconnect tunnel internals
- __repr__()
Return repr(self).
- _connect(localport, remotehost, remoteport, through=None)
- connect(host, port=None, through=None)
establish a secure shell tunnel between local and remote host
- Input:
host – remote hostname [user@host:path is also valid] port – remote port number
- Additional Input:
through – ‘tunnel-through’ hostname [default = None]
- disconnect()
destroy the ssh tunnel
- verbose = True
selector module
This module implements a selector class, which can be used to dispatch events and for event handler wrangling.
- class Selector
Bases:
object
Selector object for watching and event notification.
Takes no initial input.
- _TIMEOUT = 0.5
- _cleanup()
- _debug = <Logger pathos.selector (WARNING)>
- _dispatch(handlers, entities)
- _watch()
- notifyOnException(fd, handler)
add <handler> to the list of routines to call when <fd> raises an exception
- notifyOnInterrupt(handler)
add <handler> to the list of routines to call when a signal arrives
- notifyOnReadReady(fd, handler)
add <handler> to the list of routines to call when <fd> is read ready
- notifyOnWriteReady(fd, handler)
add <handler> to the list of routines to call when <fd> is write ready
- notifyWhenIdle(handler)
add <handler> to the list of routines to call when a timeout occurs
- watch(timeout=None)
dispatch events to the registered hanlders
serial module
This module contains map and pipe interfaces to standard (i.e. serial) python.
- Pipe methods provided:
pipe - blocking communication pipe [returns: value]
- Map methods provided:
map - blocking and ordered worker pool [returns: list] imap - non-blocking and ordered worker pool [returns: iterator]
Usage
A typical call to a pathos python map will roughly follow this example:
>>> # instantiate and configure the worker pool
>>> from pathos.serial import SerialPool
>>> pool = SerialPool()
>>>
>>> # do a blocking map on the chosen function
>>> print(pool.map(pow, [1,2,3,4], [5,6,7,8]))
>>>
>>> # do a non-blocking map, then extract the results from the iterator
>>> results = pool.imap(pow, [1,2,3,4], [5,6,7,8])
>>> print("...")
>>> print(list(results))
>>>
>>> # do one item at a time, using a pipe
>>> print(pool.pipe(pow, 1, 5))
>>> print(pool.pipe(pow, 2, 6))
Notes
This worker pool leverages the built-in python maps, and thus does not have limitations due to serialization of the function f or the sequences in args. The maps in this worker pool have full functionality whether run from a script or in the python interpreter, and work reliably for both imported and interactively-defined functions.
- class SerialPool(*args, **kwds)
Bases:
AbstractWorkerPool
Mapper that leverages standard (i.e. serial) python maps.
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
- __get_nodes()
get the number of nodes in the pool
- __set_nodes(nodes)
set the number of nodes in the pool
- __state__ = {}
- _exiting = False
- _is_alive(negate=False, run=True)
- clear()
hard restart
- close()
close the pool to any new jobs
- imap(f, *args, **kwds)
run a batch of jobs with a non-blocking and ordered map
Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- join()
cleanup the closed worker processes
- map(f, *args, **kwds)
run a batch of jobs with a blocking and ordered map
Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- property nodes
get the number of nodes in the pool
- pipe(f, *args, **kwds)
submit a job and block until results are available
Returns result of calling the function f on a selected worker. This function will block until results are available.
- restart(force=False)
restart a closed pool
- terminate()
a more abrupt close
- __get_nodes__(self)
get the number of nodes in the pool
- __set_nodes__(self, nodes)
set the number of nodes in the pool
server module
This module contains the base class for pathos servers, and describes the pathos server interface. If a third-party RPC server is selected, such as ‘parallel python’ (i.e. ‘pp’) or ‘RPyC’, direct calls to the third-party interface are currently used.
- class Server
Bases:
object
Server base class for pathos servers for parallel and distributed computing.
Takes no initial input.
- activate(onTimeout=None, selector=None)
configure the selector and install the timeout callback
- deactivate()
turn off the selector
- selector()
get the selector
- serve(timeout)
begin serving, and set the timeout
threading module
This module contains map and pipe interfaces to python’s threading module.
- Pipe methods provided:
pipe - blocking communication pipe [returns: value] apipe - asynchronous communication pipe [returns: object]
- Map methods provided:
map - blocking and ordered worker pool [returns: list] imap - non-blocking and ordered worker pool [returns: iterator] uimap - non-blocking and unordered worker pool [returns: iterator] amap - asynchronous worker pool [returns: object]
Usage
A typical call to a pathos threading map will roughly follow this example:
>>> # instantiate and configure the worker pool
>>> from pathos.threading import ThreadPool
>>> pool = ThreadPool(nodes=4)
>>>
>>> # do a blocking map on the chosen function
>>> print(pool.map(pow, [1,2,3,4], [5,6,7,8]))
>>>
>>> # do a non-blocking map, then extract the results from the iterator
>>> results = pool.imap(pow, [1,2,3,4], [5,6,7,8])
>>> print("...")
>>> print(list(results))
>>>
>>> # do an asynchronous map, then get the results
>>> results = pool.amap(pow, [1,2,3,4], [5,6,7,8])
>>> while not results.ready():
... time.sleep(5); print(".", end=' ')
...
>>> print(results.get())
>>>
>>> # do one item at a time, using a pipe
>>> print(pool.pipe(pow, 1, 5))
>>> print(pool.pipe(pow, 2, 6))
>>>
>>> # do one item at a time, using an asynchronous pipe
>>> result1 = pool.apipe(pow, 1, 5)
>>> result2 = pool.apipe(pow, 2, 6)
>>> print(result1.get())
>>> print(result2.get())
Notes
This worker pool leverages the python’s multiprocessing.dummy module, and thus has many of the limitations associated with that module. The function f and the sequences in args must be serializable. The maps in this worker pool have full functionality whether run from a script or in the python interpreter, and work reliably for both imported and interactively-defined functions. Unlike python’s multiprocessing.dummy module, pathos.threading maps can directly utilize functions that require multiple arguments.
- class ThreadPool(*args, **kwds)
Bases:
AbstractWorkerPool
Mapper that leverages python’s threading.
- Important class members:
nodes - number (and potentially description) of workers ncpus - number of worker processors servers - list of worker servers scheduler - the associated scheduler workdir - associated $WORKDIR for scratch calculations/files
- Other class members:
scatter - True, if uses ‘scatter-gather’ (instead of ‘worker-pool’) source - False, if minimal use of TemporaryFiles is desired timeout - number of seconds to wait for return value from scheduler
NOTE: if number of nodes is not given, will autodetect processors.
- NOTE: additional keyword input is optional, with:
id - identifier for the pool initializer - function that takes no input, called when node is spawned initargs - tuple of args for initializers that have args
- __get_nodes()
get the number of nodes used in the map
- __repr__()
Return repr(self).
- __set_nodes(nodes)
set the number of nodes used in the map
- __state__ = {2: <multiprocess.pool.ThreadPool state=RUN pool_size=2>}
- _clear()
Remove server with matching state
- _serve(nodes=None)
Create a new server if one isn’t already initialized
- amap(f, *args, **kwds)
run a batch of jobs with an asynchronous map
Returns a results object which containts the results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until all results are retrieved. Use the ready() method on the result object to check if all results are ready. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- apipe(f, *args, **kwds)
submit a job asynchronously to a queue
Returns a results object which containts the result of calling the function f on a selected worker. To retrieve the results, call the get() method on the returned results object. The call to get() is blocking, until the result is available. Use the ready() method on the results object to check if the result is ready.
- clear()
Remove server with matching state
- close()
close the pool to any new jobs
- imap(f, *args, **kwds)
run a batch of jobs with a non-blocking and ordered map
Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- join()
cleanup the closed worker processes
- map(f, *args, **kwds)
run a batch of jobs with a blocking and ordered map
Returns a list of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- property nodes
get the number of nodes used in the map
- property nthreads
get the number of nodes used in the map
- pipe(f, *args, **kwds)
submit a job and block until results are available
Returns result of calling the function f on a selected worker. This function will block until results are available.
- restart(force=False)
restart a closed pool
- terminate()
a more abrupt close
- uimap(f, *args, **kwds)
run a batch of jobs with a non-blocking and unordered map
Returns a list iterator of results of applying the function f to the items of the argument sequence(s). If more than one sequence is given, the function is called with an argument list consisting of the corresponding item of each sequence. The order of the resulting sequence is not guaranteed. Some maps accept the chunksize keyword, which causes the sequence to be split into tasks of approximately the given size.
- _ThreadPool(processes=None, initializer=None, initargs=())
util module
utilities for distributed computing
- _b(string, codec=None)
convert string to bytes using the given codec (default is ‘ascii’)
- _str(byte, codec=None)
convert bytes to string using the given codec (default is ‘ascii’)
- print_exc_info()
thread-safe return of string from print_exception call
- spawn(onParent, onChild)
a unidirectional fork wrapper
- Calls onParent(pid, fromchild) in parent process,
onChild(pid, toparent) in child process.
- spawn2(onParent, onChild)
a bidirectional fork wrapper
- Calls onParent(pid, fromchild, tochild) in parent process,
onChild(pid, fromparent, toparent) in child process.
xmlrpc module
- class XMLRPCRequestHandler(server, socket)
Bases:
BaseHTTPRequestHandler
create a XML-RPC request handler
Override BaseHTTPRequestHandler.__init__(): we need to be able to have (potentially) multiple handler objects at a given time.
- Inputs:
server – server object to handle requests for socket – socket connection
- _debug = <Logger pathos.xmlrpc (WARNING)>
- _sendResponse(response)
Write the XML-RPC response
- do_POST()
Access point from HTTP handler
- log_message(format, *args)
Overriding BaseHTTPRequestHandler.log_message()
- class XMLRPCServer(host, port)
Bases:
Server
,SimpleXMLRPCDispatcher
extends base pathos server to an XML-RPC dispatcher
create a XML-RPC server
- Takes two initial inputs:
host – hostname of XML-RPC server host port – port number for server requests
- _handleMessageFromChild(selector, fd)
handler for message from a child process
- _installSocket(host, port)
prepare a listening socket
- _marshaled_dispatch(data, dispatch_method=None)
override SimpleXMLRPCDispatcher._marshaled_dispatch() fault string
- _onConnection(selector, fd)
upon socket connection, establish a request handler
- _onSelectorIdle(selector)
something to do when there’s no requests
- _onSocketConnection(socket)
upon socket connections, establish a request handler
- _registerChild(pid, fromchild)
register a child process so it can be retrieved on select events
- _unRegisterChild(fd)
remove a child process from active process register
- activate()
install callbacks
- serve()
enter the select loop… and wait for service requests