pathos.helpers module documentation

mp_helper module

map helper functions

random_seed(s=None)

sets the seed for calls to ‘random()’

random_state(module='random', new=False, seed='!')

return a (optionally manually seeded) random generator

For a given module, return an object that has random number generation (RNG) methods available. If new=False, use the global copy of the RNG object. If seed=’!’, do not reseed the RNG (using seed=None ‘removes’ any seeding). If seed=’*’, use a seed that depends on the process id (PID); this is useful for building RNGs that are different across multiple threads or processes.

starargs(f)

decorator to convert a many-arg function to a single-arg function

pp_helper module

class ApplyResult(task)

Bases: pp._pp._Task

result object for an ‘apply’ method in parallelpython

enables a pp._Task to mimic the multiprocessing.pool.ApplyResult interface

_ApplyResult__unpickle()

Unpickles the result of the task

__call__(raw_result=False)

Retrieves result of the task

__init__(task)

Initializes the task

__module__ = 'pathos.helpers.pp_helper'
callback
callbackargs
finalize(sresult)

Finalizes the task *internal use only*

finished
get(timeout=None)

Retrieves result of the task

group
lock
ready()

Checks if the result is ready

server
successful()

Measures whether result is ready and loaded w/o printing

tid
wait(timeout=None)

Waits for the task

class MapResult(size, callback=None, callbackargs=(), group='default')

Bases: object

_MapResult__unpickle()

Unpickles the results of the tasks

__call__()

Retrieve the results of the tasks

__dict__ = mappingproxy({'__module__': 'pathos.helpers.pp_helper', '__init__': <function MapResult.__init__>, 'finalize': <function MapResult.finalize>, '_MapResult__unpickle': <function MapResult.__unpickle>, 'queue': <function MapResult.queue>, '__call__': <function MapResult.__call__>, 'wait': <function MapResult.wait>, 'get': <function MapResult.get>, 'ready': <function MapResult.ready>, 'successful': <function MapResult.successful>, '_set': <function MapResult._set>, '__dict__': <attribute '__dict__' of 'MapResult' objects>, '__weakref__': <attribute '__weakref__' of 'MapResult' objects>, '__doc__': None})
__init__(size, callback=None, callbackargs=(), group='default')

Initialize self. See help(type(self)) for accurate signature.

__module__ = 'pathos.helpers.pp_helper'
__weakref__

list of weak references to the object (if defined)

_set(i, task)
finalize(*results)

finalize the tasks *internal use only*

get(timeout=None)

Retrieves results of the tasks

queue(*tasks)

Fill the MapResult with ApplyResult objects

ready()

Checks if the result is ready

successful()

Measures whether result is ready and loaded w/o printing

wait(timeout=None)

Wait for the tasks

class Server(ncpus='autodetect', ppservers=(), secret=None, restart=False, proto=2, socket_timeout=3600)

Bases: object

Parallel Python SMP execution server class

Creates Server instance

ncpus - the number of worker processes to start on the local computer, if parameter is omitted it will be set to the number of processors in the system ppservers - list of active parallel python execution servers to connect with secret - passphrase for network connections, if omitted a default passphrase will be used. It’s highly recommended to use a custom passphrase for all network connections. restart - restart the worker process after each task completion proto - protocol number for pickle module socket_timeout - socket timeout in seconds, which is the maximum time a remote job could be executed. Increase this value if you have long running jobs or decrease if connectivity to remote ppservers is often lost.

With ncpus = 1 all tasks are executed consequently. For the best performance either use the default “autodetect” value or set ncpus to the total number of processors in the system.

_Server__add_to_active_tasks(num)

Updates the number of active tasks

_Server__connect()

Connects to all remote ppservers

_Server__detect_ncpus()

Detects the number of effective CPUs in the system

_Server__dumpsfunc(funcs, modules)

Serializes functions and modules

_Server__find_modules(prefix, dict)

recursively finds all the modules in dict

_Server__gentid()

Generates a unique job ID number

_Server__get_source(func)

Fetches source of the function

_Server__scheduler()

Schedules jobs for execution

_Server__stat_add_job(node)

Increments job count on the node

_Server__stat_add_time(node, time_add)

Updates total runtime on the node

_Server__update_active_rworkers(id, count)

Updates list of active rworkers

__del__()
__dict__ = mappingproxy({'__module__': 'pp._pp', '__doc__': 'Parallel Python SMP execution server class\n ', 'default_port': 60000, 'default_secret': 'epo20pdosl;dksldkmm', '__init__': <function Server.__init__>, 'submit': <function Server.submit>, 'wait': <function Server.wait>, 'get_ncpus': <function Server.get_ncpus>, 'set_ncpus': <function Server.set_ncpus>, 'get_active_nodes': <function Server.get_active_nodes>, 'get_stats': <function Server.get_stats>, 'print_stats': <function Server.print_stats>, 'insert': <function Server.insert>, 'connect1': <function Server.connect1>, '_Server__connect': <function Server.__connect>, '_Server__detect_ncpus': <function Server.__detect_ncpus>, '_Server__dumpsfunc': <function Server.__dumpsfunc>, '_Server__find_modules': <function Server.__find_modules>, '_Server__scheduler': <function Server.__scheduler>, '_Server__get_source': <function Server.__get_source>, '_run_local': <function Server._run_local>, '_run_remote': <function Server._run_remote>, '_Server__add_to_active_tasks': <function Server.__add_to_active_tasks>, '_Server__stat_add_time': <function Server.__stat_add_time>, '_Server__stat_add_job': <function Server.__stat_add_job>, '_Server__update_active_rworkers': <function Server.__update_active_rworkers>, '_Server__gentid': <function Server.__gentid>, '__del__': <function Server.__del__>, 'destroy': <function Server.destroy>, '__dict__': <attribute '__dict__' of 'Server' objects>, '__weakref__': <attribute '__weakref__' of 'Server' objects>})
__init__(ncpus='autodetect', ppservers=(), secret=None, restart=False, proto=2, socket_timeout=3600)

Creates Server instance

ncpus - the number of worker processes to start on the local computer, if parameter is omitted it will be set to the number of processors in the system ppservers - list of active parallel python execution servers to connect with secret - passphrase for network connections, if omitted a default passphrase will be used. It’s highly recommended to use a custom passphrase for all network connections. restart - restart the worker process after each task completion proto - protocol number for pickle module socket_timeout - socket timeout in seconds, which is the maximum time a remote job could be executed. Increase this value if you have long running jobs or decrease if connectivity to remote ppservers is often lost.

With ncpus = 1 all tasks are executed consequently. For the best performance either use the default “autodetect” value or set ncpus to the total number of processors in the system.

__module__ = 'pp._pp'
__weakref__

list of weak references to the object (if defined)

_run_local(job, sfunc, sargs, worker)

Runs a job locally

_run_remote(job, sfunc, sargs, rworker)

Runs a job remotelly

connect1(host, port, persistent=True)

Conects to a remote ppserver specified by host and port

default_port = 60000
default_secret = 'epo20pdosl;dksldkmm'
destroy()

Kills ppworkers and closes open files

get_active_nodes()

Returns active nodes as a dictionary [keys - nodes, values - ncpus]

get_ncpus()

Returns the number of local worker processes (ppworkers)

get_stats()

Returns job execution statistics as a dictionary

insert(sfunc, sargs, task=None)

Inserts function into the execution queue. It’s intended for internal use only (in ppserver).

print_stats()

Prints job execution statistics. Useful for benchmarking on clusters

set_ncpus(ncpus='autodetect')

Sets the number of local worker processes (ppworkers)

ncpus - the number of worker processes, if parammeter is omitted
it will be set to the number of processors in the system
submit(func, args=(), depfuncs=(), modules=(), callback=None, callbackargs=(), group='default', globals=None)

Submits function to the execution queue

func - function to be executed args - tuple with arguments of the ‘func’ depfuncs - tuple with functions which might be called from ‘func’ modules - tuple with module names to import callback - callback function which will be called with argument list equal to callbackargs+(result,) as soon as calculation is done callbackargs - additional arguments for callback function group - job group, is used when wait(group) is called to wait for jobs in a given group to finish globals - dict from which all modules, functions, and classes will be imported, for instance: globals=globals()

wait(group=None)

Waits for all jobs in a given group to finish. If group is omitted waits for all jobs to finish

exception TimeoutError

Bases: multiprocess.context.ProcessError

__module__ = 'multiprocess.context'
_ApplyResult

alias of multiprocess.pool.ApplyResult

_MapResult

alias of multiprocess.pool.MapResult

class _Task(server, tid, callback=None, callbackargs=(), group='default')

Bases: object

Class describing single task (job)

Initializes the task

_Task__unpickle()

Unpickles the result of the task

__call__(raw_result=False)

Retrieves result of the task

__dict__ = mappingproxy({'__module__': 'pp._pp', '__doc__': 'Class describing single task (job)\n ', '__init__': <function _Task.__init__>, 'finalize': <function _Task.finalize>, '__call__': <function _Task.__call__>, 'wait': <function _Task.wait>, '_Task__unpickle': <function _Task.__unpickle>, '__dict__': <attribute '__dict__' of '_Task' objects>, '__weakref__': <attribute '__weakref__' of '_Task' objects>})
__init__(server, tid, callback=None, callbackargs=(), group='default')

Initializes the task

__module__ = 'pp._pp'
__weakref__

list of weak references to the object (if defined)

finalize(sresult)

Finalizes the task.

For internal use only

wait()

Waits for the task