In addition to basic job creation, status viewing, and result accessing, the cloud module allows you to utilize callbacks, dependencies, read job output, kill jobs, and more.
After your function is run on PiCloud, you may wish to be instantly notified of its completion. Cloud allows such notifications through the use of the _callback keyword. Set this keyword to a single function or list of functions that should be called on the client when the job completes. For instance:
def bar(jid):
print jid + 'completed!'
cloud.call(foo,_callback=[bar])
Once foo completes, bar will be called on the client. If _callback is a list, the list will be called in order.
Note
Callbacks are not currently supported with cloud.map().
The _callback will only be called when a function completes successfully. Another list of callbacks, specified by the keyword argument _callback_on_error, will be called if the job fails (errors or stalls).
cloud.iresult() will return an iterator that will walk through job results in order, as they become available. This function is similar to cloud.result(), which blocks until every result is available.
Example:
def square(x):
time.sleep(x)
return x*x
jids = cloud.map(square, range(8))
for y in cloud.iresult(jids):
print y
# will print 0, 1, 4, 9, 16, 25, 36, 49. Printing will start before every job has completed.
As the iterator runs before all jobs complete, error handling is a bit different than with cloud.result(). cloud.iresult() raises an exception only once the iterator tries to produce the result of an errored job.
If ignore_errors is set, no exceptions are thrown when encountering errored jobs. Instead, a cloud.CloudException describing the error is returned by the iterator.
num_in_parallel controls how many results can be pre-fetched (before being iterated over). Set this to 0 to allow the system’s maximum number of results to be pre-fetched. Higher numbers can improve the performance of the iterator, at the cost of higher memory consumption. The default value is 4.
As with cloud.result(), a timeout can be set. If the iterator cannot retrieve the next result within timeout seconds, the iterator will raise a cloud.CloudTimeoutError.
There are two ways to set timeouts:
If the timeout is set to None (default), timeouts will be disabled.
Note
On an error (other than a timeout), the iterator does not exhaust. Continuing to call next will successfully get the next corresponding result.
If a job is taking too long (or worse, stuck in an infinite loop), you can abort it with cloud.kill().
Example:
def infinite_loop():
while True:
pass
jid = cloud.call(infinite_loop) #start a job which will never end
cloud.kill(jid) #at least until you kill it
You can also pass in a sequence of jids to kill. Killed jobs will have their status set to killed. Any job dependent on a killed job will stall.
If you pass no arguments in, e.g. cloud.kill(), you will kill all queued and processing jobs.
Note
It may take several seconds to kill a job. After killing a job, you can join it to view a traceback.
PiCloud allows you to see what your job is writing to standard output and error easily with cloud.info(). This function takes two arguments, a single or sequence of jids and a single string or sequence of strings, info_requested, stating the “information” desired. Possible info_requested include "stdout" and "stderr" to read what the job has printed to standard output and standard error respectively thus far.
As this function is intended primarily for debugging, cloud.info() returns a dictionary whose keys are jids. The value of each entry is another dictionary, whose keys consist of the elements that were in info_requested. These keys in turn map to relevant information about the job, e.g. result[23]["stout"] will map to a string representation of job 23’s standard output.
Example:
def foo():
print "Output"
print >> sys.stderr, "An Error"
jid = cloud.call(foo)
cloud.join(jid)
cloud.info(jid, ['stderr', 'stdout'] )
#returns {jid: {'stderr': 'An Error\n', 'stdout': 'Output\n'}}
You can also view output in the web view for jobs.
Note
Only the last 64,000 characters of output produced can be accessed.
Warning
If you are inspecting a killed job, note that only information flushed to stdout or stderr will show. Text that was in output buffers when a job was killed will not be shown. You can avoid this problem by manually flushing output with sys.stdout.flush() and sys.stderr.flush().
cloud.info() may also be used to get job runtime information, even while the job is still running. If info_requested includes "runtime", the value of result[jid]["runtime"] will be the amount of real time that the job jid ran for (or is running for).
PiCloud recognizes that you may wish to erase some of your data from our servers. Use the cloud.delete() function to remove information about one or more completed jobs from PiCloud. Of course, after removing a job, you will not be able to access its status, result, runtime, etc.
cloud.delete() is also important when using the simulator or cloud.mp. See the memory issues section for more information.
Note
A job must have completed (done, errored, killed) for it to be deleted; otherwise, a CloudException will be raised.
PiCloud will try to run your jobs as quick as possible, but if you are transmitting many long-lasting jobs, your jobs may be queued for many seconds. If you suspect your jobs will be backlogged, you may wish to utilize the priority keyword argument. Both cloud.call() and cloud.map() support such an argument, which controls which jobs will be dequeued first. Jobs with the same priority (the default is 5) are run in FIFO order. When two jobs have a different priority, the job with the lower priority number will be run first.
Example:
cloud.call(foo,priority=1) #calls foo() with high priority
When using the web viewer, it is useful to have your functions labeled to easily identify and search for jobs. Use the _label keyword argument to assign a string label to your function.
Example:
cloud.call(foo,_label='foo job') #"foo" job will have label of 'foo job'
PiCloud has the powerful ability for you to dictate the ordering of jobs. For instance, you may have a function download() that downloads images and a function find_faces() that detects faces in the images. You can specify with the _depends_on keyword in either cloud.call() or cloud.map() which jobs must complete before a new function is run. In our example:
download_jids = cloud.map(download,image_id_list)
find_faces_jids = cloud.map(find_faces_in_images,image_id_list,_depends_on=download_jids)
The _depends_on keyword accepts both an individual jid or a sequence of jids.
Recall that if job B _depends_on job A and job A errors, job B will never run and its status will be set to stalled.
Even in cases where dependencies are not strictly necessary, they should be used. Consider the following modified code from the Technical Overview:
def job1():
return 1
def job2(jid):
return cloud.result(jid)
if __name__ == '__main__': #Client entry point
jid1 = cloud.call(job1) #jid of job1
jid2 = cloud.call(job2, jid1, _depends_on = jid1) #dependency!
ret = cloud.result(jid2) #the result of both
If _depends_on were not present (as in the original), job2 would run simultaneously with job1. Not only are you charged for a job that is just busy waiting (on job1), but also you have a useless job counting against your parallelism limit.
By setting the _profile keyword to True, you can request that PiCloud profile a job and present the timing analysis on the Jobs page. For most workloads which use extensive I/O or Python C extension calls (e.g. numpy), the overhead is trivial. However, if your job has a high amount of Python function calls, profiling overhead may become significant.
No computing infrastructure is immune to hardware failure, including that which PiCloud runs on. There is an extremely small possibility that a computer will fail while processing a job. While such a failure will never occur for most users, if you are running millions of jobs, the possibility of a hardware failure affecting one rises.
By default, PiCloud assumes it can safely restart jobs that fail while executing. However, this is not always true; a failed job may be manipulating some form of external state (e.g. writing to a database) and blindly restarting the job could cause data corruption.
If your job writes to an external source and you cannot design your job to recover from failure, you may wish to set the _restartable keyword to False. A hardware failure will then result in an exception being raised, rather than the job being restarted on another computer.
Example:
def foo():
"""writes to 2 databases. If a failure occurs before the second write, we have a problem"""
#write to a database
#write to another database
cloud.call(foo,_restartable=False) #If foo() fails due to hardware failure, alert user with exception
Example 2:
def square(x):
return x*x
#note that square does not write to external sources
cloud.map(square,range(10000),_restartable=True) #Note that _restartable=True can be omitted
As mentioned earlier, PiCloud uses an enhanced pickler to support additional datatypes and provide debugging features. With large amounts of data though, such enhancements may lead to slowdown.
The _fast_serialization keyword allows for certain features of the serializer to be bypassed. It controls both the serialization of arguments and the result; the function is always serialized with the full enhanced pickler. This keyword is set to an integer, with meaning:
Also be aware that enabling serialization logging (described below) can result in slower serialization.
Warning
Using _fast_serialization=2 can result in strange, unclear error exceptions during pickling and depickling. If you experience such errors with this mode, consider using level 1.
By default, when an instance is serialized, all of its attributes also will be serialized. This behavior can cause problems though when attributes point to data structures that are unnecessary on PiCloud (a cache) or cannot be transported (a socket).
You can solve this issue by using Python’s built-in __getstate__ function. __getstate__ allows objects to alter what variables are saved when they are serialized.
PiCloud provides the following __getstate__ code to mark variables transient (a term from Java). When a variable is marked transient, it will not be serialized; when the instance is recreated on PiCloud’s server, a transient variable will hold the class’ default value. Be aware that the an instances’ __init__ method is not called during depickling, so the variable won’t exist on PiCloud if it is not defined at class level. If you wish to modify a variable during depickling, make use of the built-in __setstate__ event.
Without further ado, here’s the code:
def __getstate__(self):
transient = [] #fill in variables that should not be pickled here
my_dct = self.__dict__.copy()
for k in list(my_dct.keys()):
if k in transient:
del my_dct[k]
return my_dct
To mark variables transient, add their names to the special transient list shown above. For example:
class Obj(object):
a = 39
b = 28
def __getstate__(self):
transient = ['b'] #fill in variables that should not be pickled here
my_dct = self.__dict__.copy()
for k in list(my_dct.keys()):
if k in transient:
del my_dct[k]
return my_dct
When an instance of type Obj is pickled, the value of b will not be saved. After unpickling the instance is guaranteed to have b == 28.
As noted in the technical overview, after a function completes, subsequent jobs may be run in the same interpreter. This optimization speeds up function execution time significantly, but in rare cases may cause problems if your function assumes it is only called once per process. Such problematic cases have been known to occur when using Twisted reactors or other threading libraries. If you wish for the interpreter to be terminated after function completion, set the _kill_process keyword to True.
PiCloud allows you to register a function to be invoked periodically on PiCloud. The function and its accompanying schedule are referred to as a cron. Be aware that PiCloud schedules crons in the GMT (UTC+0) time zone.
For more information on crons, see the module documentation: cloud.cron
cloudconf.py manages configuration for the cloud module. It is a regular python script file which is run when cloud is imported. You can use this file to set your account information, use the simulator by default, or tweak more advanced settings.
cloudconf.py is located at $HOME/.picloud on UNIX-like systems (Linux, OSX) and %APPDATA%/picloud on Windows (replace picloud with picloud3 if you are using python3.x). The exact directory that this configuration file is stored in is returned by cloud.getconfigpath().
If you wish to set configuration at runtime, use the cloud.config interface. All of the settings that exist in cloudconf.py exist as properties of this interface. Note that changes do not take effect immediately; you must call cloud.config.commit() to update cloud.
Warning
Calling cloud.config.commit() will disconnect all currently running cloud instances and reset any before-made cloud.setkey() calls.
The cloud module includes a stripped down version of the PiCloud server, which can be used on a single machine. The interface of cloud.mp module mirrors cloud, except that all jobs are run locally through the use of the multiprocessing library.
Specifically, a pool of child processes is created that will run jobs. As multiple processes are used, jobs can be run simultaneously on multiple CPU cores.
Note
As with the regular cloud module, cloud.mp can be used in functions executing on PiCloud.
Warning
This is not a complete simulation of the PiCloud server. Also, recall that deadlock is possible if jobs call cloud.call() recursively; be sure that the maximum number of jobs that need to be run simultaneously are lower than the configured size of the process pool. See the pitfall section for more information.
Every time you run a job with cloud.mp, the job (result, status, etc.) is held to memory. As more jobs run, more memory will be consumed by the Python process. There are two methods that can be used to prevent this ever enlarging of memory:
The child process pool is run in daemon mode. When the main process terminates, so will the children, killing any active job. To ensure that all of your jobs are run, you must use cloud.join() or cloud.result() to block until they are complete.
To verify that your code works, you may wish to use the PiCloud simulator. The simulator is effectively just cloud.mp with separate configuration ability. With the simulator, code is significantly easier to debug (esp. with breakpoints); the simulator also verifies that your data can be sent over to the PiCloud servers.
To use the simulator run:
cloud.start_simulator()
Or edit cloudconf.py, and set use_simulator to True.
The simulator is not a perfect simulation of PiCloud. In the simulator, you will have access directly to your computer’s file system, which of course is not available on PiCloud. Furthermore, module name conflicts are not detected nor is the use of a non-synchronized extension. cloud.files is not simulated; it runs exactly as it would on PiCloud. Finally, the termination issues that apply to cloud.mp also apply to the simulator.
Cloud utilizes the logging module to note important events. All log messages are sent to Cloud logger, which your own module can add handlers to.
By default, all log messages at INFO level or higher are written to $HOME/.picloud/cloud.log. You can change this behavior by setting log_level in cloudconf.py.
To properly support WSGI, no logs are printed to stderr by default. If you wish to change this behavior, set print_log_level in cloudconf.py to something below CRITICAL.
Cloud can log all data being serialized by the extended pickler and sent to PiCloud. These logs are especially useful if a pickling error occurs.
In the cloud config, serialize_logging controls if serialization logs will be written for when using cloud. force_serialize_logging (in the simulation section) will switch on serialize_logging if the simulator is running. Their respective defaults are False and True. Note that _fast_serialization must also be set to 0 (default) for logging to occur.
The actual logs are found in the datalogs subdirectory of $HOME/.picloud/ ($HOME/.picloud3/ on python3). Inside that directory are various cloud adapter (Simulation, HTTPConnection, Multiprocessing) directory. Within each subdirectory are timestamped sub-directories noting when the cloud was opened.
The XML files within the timestamped directories describe transmitted serialized data. They are formatted as <call/map>.module.function_name.<sequence>.<func/args>.xml. For instance, if you:
cloud.call(mymodule.foo,arg1,arg2)
you will see call.mymodule.foo.1.func.xml showing data describing mymodule.foo and call.mymodule.foo.1.args.xml showing data associated with arguments (arg1, arg2) being passed into mymodule.foo. The next identical cloud.call will produce an all.mymodule.foo.2.func.xml and an all.mymodule.foo.2.args.xml.
A log might look something like:
<?xml version="1.0" encoding="utf-8"?>
<!-- Properties minimumObjectSizePrinted='0' typeCompositionShown='False' showPrimitives='False' -->
<PickledObject type='function' size='944' memo_id='26' isLambda='True' module='__main__' fileName='/home/astaley/workspace/AutoTagger/src/autotagger/cloud/tests/test_cloud.py' firstLineNumber='81' TransmittedFunctionCode='True'>
<Attribute name='globals' type='dict' size='510' memo_id='27' numElements='1' keyType='str' valueType='function'>
<Value key='conc' keyObjId='3085541792' type='function' size='504' memo_id='62' funcName='conc' module='autotagger2.scheduler' fileName='/usr/local/lib/python2.6/dist-packages/autotagger2/scheduler/__init__.py' firstLineNumber='8'>
<Attribute name='attr' type='function' size='410' memo_id='55' funcName='osmod2' module='__main__' fileName='/home/astaley/workspace/AutoTagger/src/autotagger/cloud/tests/test_cloud.py' firstLineNumber='53' TransmittedFunctionCode='True'>
<Attribute name='globals' type='dict' size='20' memo_id='56' numElements='1' keyType='str' valueType='module'>
<Value key='os' keyObjId='3085749056' type='module' size='14' memo_id='59' module='os'/>
</Attribute>
<Attribute name='closure' type='list' size='3' memo_id='60' numElements='0'/>
<Attribute name='function_attributes' type='dict' size='3' memo_id='61' numElements='0'/>
</Attribute>
</Value>
</Attribute>
<Attribute name='closure' type='list' size='3' memo_id='63' numElements='0'/>
<Attribute name='function_attributes' type='dict' size='3' memo_id='64' numElements='0'/>
</PickledObject>
This describes a transmitted function and all of its references (a child xml node is being referenced by its parent). The function has several attributes, such as the global variables it references. Note that the type of the globals is ‘dict’. The dictionary in turn references various Values (with ‘keys’ to reference them).
Other notable attributes:
- name: Name of an attribute in a class or function
- type: Type of this object
- size: Amount of space this data is taking in the serialized pickle (including references).
- memo_id: When an object is referenced multiple times, it is referenced by this id.
- fileName/firstLineNumber: Where a function is located
- numElements: Number of elements in a sequence type.
By default, primitives (strings, integers, floats, etc.) will not be printed out.
When running the cloud in multiprocessing, a dump of the serialized result will also be available. Standard output and standard input will also be written, using the same filename format as serialized data dumps.
To provide an easy way to replace calls to the multiprocessing Pool library, cloud.pool_interface exists. This module has a near-duplicate interface as multiprocessing.Pool. However, because it is lacks many features of the cloud module, it is not recommended for production uses.
With cloud.account, it is possible to manage account configuration programmatically, specifically:
- Provision API keys
- Manage real time compute units
See documentation on cloud.account for more information.