In addition to basic job creation, status viewing, and result accessing, the cloud module allows you to utilize callbacks, dependencies, read job output, kill jobs, and more.
After your function is run on PiCloud, you may wish to be instantly notified of its completion. Cloud allows such notifications through the use of the _callback keyword. Set this keyword to a single function or list of functions that should be called on the client when the job completes. For instance:
def bar(jid):
print jid + 'completed!'
cloud.call(foo,_callback=[bar])
Once foo completes, bar will be called on the client. If _callback is a list, the list will be called in order.
Note
Callbacks are not currently supported with cloud.map().
The _callback will only be called when a function completes successfully. Another list of callbacks, specified by the keyword argument _callback_on_error, will be called if the job fails (errors or stalls).
cloud.iresult() will return an iterator that will walk through job results in order, as they become available. This function is similar to cloud.result(), which blocks until every result is available.
Example:
def square(x):
time.sleep(x)
return x*x
jids = cloud.map(square, range(8))
for y in cloud.iresult(jids):
print y
# will print 0, 1, 4, 9, 16, 25, 36, 49. Printing will start before every job has completed.
As the iterator runs before all jobs complete, error handling is a bit different than with cloud.result(). cloud.iresult() raises an exception only when the iterator tries to produce the result of an errored job.
num_in_parallel controls how many results can be pre-fetched (before being iterated over). Set this to 0 to allow the system’s maximum number of results to be pre-fetched. Higher numbers can improve the performance of the iterator, at the cost of higher memory consumption. The default value is 4.
As with :func`cloud.result`, a timeout can be set. If the iterator cannot retrieve the next result within timeout seconds, the iterator will raise a cloud.CloudTimeoutError.
There are two ways to set timeouts:
If the timeout is set to None (default), timeouts will be disabled.
Note
On an error (other than a timeout), the iterator does not exhaust. Continuing to call next will successfully get the next corresponding result.
If a job is taking too long (or worse, stuck in an infinite loop), you can abort it with cloud.kill().
Example:
def infinite_loop():
while True:
pass
jid = cloud.call(infinite_loop) #start a job which will never end
cloud.kill(jid) #at least until you kill it
You can also pass in a sequence of jids to kill. Killed jobs will have their status set to killed. Any job dependent on a killed job will stall.
Note
It may take several seconds to kill a job.
If your job writes to standard output or standard error, PiCloud allows you to easily view such output with cloud.info(). This function takes two arguments, a single or sequence of jids and a single string or sequence of strings, info_requested, stating the “information” desired. Possible info_requested include "stdout" and "stderr" to read what the job has printed to standard output and standard error respectively.
As this function is intended primarily for debugging, cloud.info() returns a dictionary whose keys are jids. The value of each entry is another dictionary, whose keys consist of the elements that were in info_requested. These keys in turn map to relevant information about the job, e.g. result[23]["stout"] will map to a string representation of job 23’s standard output.
Example:
def foo():
print "Output"
print >> sys.stderr, "An Error"
jid = cloud.call(foo)
cloud.join(jid)
cloud.info(jid, ['stderr', 'stdout'] )
#returns {jid: {'stderr': 'An Error\n', 'stdout': 'Output\n'}}
You can also view output in the web view for jobs.
Note
Only the last 1,000 lines of output can be accessed. Also, output is not available from the PiCloud server until a job has completed or errored. In simulation and cloud.mp, output is available immediately after being flushed.
Warning
If you are inspecting a killed job, note that only information flushed to stdout or stderr will show. Text that was in output buffers when a job was killed will not be shown. You can avoid this problem by manually flushing output with sys.stdout.flush() and sys.stderr.flush().
cloud.info() may also be used to get job runtime information. If info_requested includes "runtime", the value of result[jid]["runtime"] will be the amount of real time that the job jid ran for.
Note
Runtime is not available from the PiCloud server until the job has completed or errored. In simulation and cloud.mp, runtime is available as the job runs.
PiCloud recognizes that you may wish to erase some of your data from our servers. Use the cloud.delete() function to remove information about one or more completed jobs from PiCloud. Of course, after removing a job, you will not be able to access its status, result, runtime, etc.
cloud.delete() is also important when using the simulator or cloud.mp. See the memory issues section for more information.
When using the web viewer, it is useful to have your functions labeled to easily identify and search for jobs. Use the _label keyword argument to assign a string label to your function.
Example:
cloud.call(foo,_label='foo job') #"foo" job will have label of 'foo job'
PiCloud has the powerful ability for you to dictate the ordering of jobs. For instance, you may have a function download() that downloads images and a function find_faces() that detects faces in the images. You can specify with the _depends_on keyword in either cloud.call() or cloud.map() which jobs must complete before a new function is run. In our example:
download_jids = cloud.map(download,image_id_list)
find_faces_jids = cloud.map(find_faces_in_images,image_id_list,_depends_on=download_jids)
The _depends_on keyword accepts both an individual jid or a sequence of jids.
Recall that if job B _depends_on job A and job A errors, job B will never run and its status will be set to stalled.
Even in cases where dependencies are not strictly necessary, they should be used. Consider the following modified code from the Technical Overview:
def job1():
return 1
def job2(jid):
return cloud.result(jid)
if __name__ == '__main__': #Client entry point
jid1 = cloud.call(job1) #jid of job1
jid2 = cloud.call(job2, jid1, _depends_on = jid1) #dependency!
ret = cloud.result(jid2) #the result of both
If _depends_on were not present (as in the original), job2 would run simultaneously with job1. Not only are you charged for a job that is just busy waiting (on job1), but also you have a useless job counting against your parallelism limit.
By setting the _profile keyword to True, you can request that PiCloud profile a job and present the timing analysis on the Jobs page. For most workloads which use extensive I/O or Python C extension calls (e.g. numpy), the overhead is trivial. However, if your job has a high amount of Python function calls, profiling overhead may become significant.
No computing infrastructure is immune to hardware failure, including that which PiCloud runs on. There is an extremely small possibility that a computer will fail while processing a job. While such a failure will never occur for most users, if you are running millions of jobs, the possibility of a hardware failure affecting one rises.
By default, PiCloud assumes it can safely restart jobs that fail while executing. However, this is not always true; a failed job may be manipulating some form of external state (e.g. writing to a database) and blindly restarting the job could cause data corruption.
If your job writes to an external source and you cannot design your job to recover from failure, you may wish to set the _restartable keyword to False. A hardware failure will then result in an exception being raised, rather than the job being restarted on another computer.
Example:
def foo():
"""writes to 2 databases. If a failure occurs before the second write, we have a problem"""
#write to a database
#write to another database
cloud.call(foo,_restartable=False) #If foo() fails due to hardware failure, alert user with exception
Example 2:
def square(x):
return x*x
#note that square does not write to external sources
cloud.map(square,range(10000),_restartable=True) #Note that _restartable=True can be omitted
As mentioned earlier, PiCloud uses an enhanced pickler to support additional datatypes and provide debugging features. With large amounts of data though, such enhancements may lead to slowdown.
The _fast_serialization keyword allows for certain features of the serializer to be bypassed. It controls both the serialization of arguments and the result; the function is always serialized with the full enhanced pickler. This keyword is set to an integer, with meaning:
Also be aware that enabling serialization logging (described below) can result in slower serialization.
Warning
Using _fast_serialization=2 can result in strange, unclear error exceptions during pickling and depickling. If you experience such errors with this mode, consider using level 1.
PiCloud will try to run your jobs as soon as possible, but if you are transmitting many long-lasting jobs, your jobs may be queued for a small amount of time. If you suspect your jobs will be backlogged, you may wish to utilize the _priority keyword argument. Both cloud.call() and cloud.map() support such an argument, which hints to PiCloud which jobs should be run first. Jobs are typically run in FIFO order, but PiCloud will try to run jobs with lower priority numbers before those with higher priority numbers. The default priority is 5.
Note
Priorities only serve as a guide to PiCloud. Jobs are not guaranteed to run in priority order.
Example:
cloud.call(foo,_priority=1) #calls foo() with high priority
cloudconf.py manages configuration for the cloud module. It is a regular python script file which is run when cloud is imported. You can use this file to set your account information, use the simulator by default, or tweak more advanced settings.
cloudconf.py is located at $HOME/.picloud on UNIX-like systems (Linux, OSX) and %APPDATA%/picloud on Windows (replace picloud with picloud3 if you are using python3.x). The exact directory that this configuration file is stored in is returned by cloud.getconfigpath().
If you wish to set configuration at runtime, use the cloud.config interface. All of the settings that exist in cloudconf.py exist as properties of this interface. Note that changes do not take effect immediately; you must call cloud.config.commit() to update cloud.
Warning
Calling cloud.config.commit() will disconnect all currently running cloud instances and reset any before-made cloud.setkey() calls.
With cloud.account, it is possible to provision API keys programmatically. See documentation on cloud.account for more information.
The cloud module includes a stripped down version of the PiCloud server, which can be used on a single machine. The interface of cloud.mp module mirrors cloud, except that all jobs are run locally through the use of the multiprocessing library.
Specifically, a pool of child processes is created that will run jobs. As multiple processes are used, jobs can be run simultaneously on multiple CPU cores.
Note
As with the regular cloud module, cloud.mp can be used in functions executing on PiCloud.
Warning
This is not a complete simulation of the PiCloud server. Also, recall that deadlock is possible if jobs call cloud.call() recursively; be sure that the maximum number of jobs that need to be run simultaneously are lower than the configured size of the process pool. See the pitfall section for more information.
Every time you run a job with cloud.mp, the job (result, status, etc.) is held to memory. As more jobs run, more memory will be consumed by the Python process. There are two methods that can be used to prevent this ever enlarging of memory:
The child process pool is run in daemon mode. When the main process terminates, so will the children, killing any active job. To ensure that all of your jobs are run, you must use cloud.join() or cloud.result() to block until they are complete.
To verify that your code works, you may wish to use the PiCloud simulator. The simulator is effectively just cloud.mp with separate configuration ability. With the simulator, code is significantly easier to debug (esp. with breakpoints); the simulator also verifies that your data can be sent over to the PiCloud servers.
To use the simulator run:
cloud.start_simulator()
Or edit cloudconf.py, and set use_simulator to True.
The simulator is not a perfect simulation of PiCloud. In the simulator, you will have access directly to your computer’s file system, which of course is not available on PiCloud. Furthermore, module name conflicts are not detected nor is the use of a non-synchronized extension. cloud.files is not simulated; it runs exactly as it would on PiCloud. Finally, the termination issues that apply to cloud.mp also apply to the simulator.
Cloud utilizes the logging module to note important events. All log messages are sent to Cloud logger, which your own module can add handlers to.
By default, all log messages at INFO level or higher are written to $HOME/.picloud/cloud.log. You can change this behavior by setting log_level in cloudconf.py.
To properly support WSGI, no logs are printed to stderr by default. If you wish to change this behavior, set print_log_level in cloudconf.py to something below CRITICAL.
Cloud can log all data being serialized and sent to PiCloud. All data is serialized by the extended pickler.
In the cloud config, serialize_logging controls if the logs should be written for regular (network or mp) cloud and force_serialize_logging (in the simulation section) will switch on serialize_logging if the simulator is running. Their respective defaults are False and True. Note that _fast_serialization must also be set to 0 (default) for logging to occur.
The actual logs are found in the datalogs subdirectory of $HOME/.picloud/ ($HOME/.picloud3/ on python3). Inside that directory are various cloud adapter (Simulation, HTTPConnection, Multiprocessing) directory. Within each subdirectory are timestamped sub-directories noting when the cloud was opened.
The XML files within the timestamped directories describe transmitted serialized data. They are formatted as <call/map>.module.function_name.<sequence>.<func/args>.xml. For instance, if you:
cloud.call(mymodule.foo,arg1,arg2)
you will see call.mymodule.foo.1.func.xml showing data describing mymodule.foo and call.mymodule.foo.1.args.xml showing data associated with arguments (arg1, arg2) being passed into mymodule.foo. The next identical cloud.call will produce an all.mymodule.foo.2.func.xml and an all.mymodule.foo.2.args.xml.
A log might look something like:
<?xml version="1.0" encoding="utf-8"?>
<!-- Properties minimumObjectSizePrinted='0' typeCompositionShown='False' showPrimitives='False' -->
<PickledObject type='function' size='944' memo_id='26' isLambda='True' module='__main__' fileName='/home/astaley/workspace/AutoTagger/src/autotagger/cloud/tests/test_cloud.py' firstLineNumber='81' TransmittedFunctionCode='True'>
<Attribute name='globals' type='dict' size='510' memo_id='27' numElements='1' keyType='str' valueType='function'>
<Value key='conc' keyObjId='3085541792' type='function' size='504' memo_id='62' funcName='conc' module='autotagger2.scheduler' fileName='/usr/local/lib/python2.6/dist-packages/autotagger2/scheduler/__init__.py' firstLineNumber='8'>
<Attribute name='attr' type='function' size='410' memo_id='55' funcName='osmod2' module='__main__' fileName='/home/astaley/workspace/AutoTagger/src/autotagger/cloud/tests/test_cloud.py' firstLineNumber='53' TransmittedFunctionCode='True'>
<Attribute name='globals' type='dict' size='20' memo_id='56' numElements='1' keyType='str' valueType='module'>
<Value key='os' keyObjId='3085749056' type='module' size='14' memo_id='59' module='os'/>
</Attribute>
<Attribute name='closure' type='list' size='3' memo_id='60' numElements='0'/>
<Attribute name='function_attributes' type='dict' size='3' memo_id='61' numElements='0'/>
</Attribute>
</Value>
</Attribute>
<Attribute name='closure' type='list' size='3' memo_id='63' numElements='0'/>
<Attribute name='function_attributes' type='dict' size='3' memo_id='64' numElements='0'/>
</PickledObject>
This describes a transmitted function and all of its references (a child xml node is being referenced by its parent). The function has several attributes, such as the global variables it references. Note that the type of the globals is ‘dict’. The dictionary in turn references various Values (with ‘keys’ to reference them).
Other notable attributes:
- name: Name of an attribute in a class or function
- type: Type of this object
- size: Amount of space this data is taking in the serialized pickle (including references).
- memo_id: When an object is referenced multiple times, it is referenced by this id.
- fileName/firstLineNumber: Where a function is located
- numElements: Number of elements in a sequence type.
By default, primitives (strings, integers, floats, etc.) will not be printed out.
When running the cloud in multiprocessing, a dump of the serialized result will also be available. Standard output and standard input will also be written, using the same filename format as serialized data dumps.
To provide an easy way to replace calls to the multiprocessing Pool library, cloud.pool_interface exists. This module has a near-duplicate interface as multiprocessing.Pool. However, because it is lacks many features of the cloud module, it is not recommended for production uses.