Queue

Queues provide an interface for Dataflow Programming on PiCloud that is built on top of our job system.

While a distributed queue data structure is offered with push and pop capabilities, the key benefit is the ability to bind a queue to a Message Handler for scalable processing of the queue’s data. The Message Handler in turn can feed its output to other queues enabling you to build pipelines. You only pay for computation while messages in a queue are being processed by the message handler.

You’ll need version 2.7.6 of the cloud client to use queues.

digraph foo {
   rankdir=LR;
   node [shape=circle,fixedsize=false,width=0.8,fontsize=10]; "f()";

   //node [shape=circle,color=white,style=filled,fillcolor=white]; input; output;
   node [shape=rect,color=black,style=filled,fillcolor=white,height=0.3]; input; output;
   "input" -> "f()";
   "f()" -> "output";
}

If you’re well-versed in our job processing framework, see Comparison with Job Framework.

Note

Queues can currently only be accessed through our Python module or REST.

Creating a Queue

There’s no need to create a queue explicitly. Whenever you reference a queue, it is automatically created on our systems, if it doesn’t already exist.

In Python, you should use our CloudQueue object, by using cloud.queue.get().

>>> q = cloud.queue.get('numbers')
>>> q
CloudQueue('numbers')

The queue-specific functionality we’ll cover in this article can be accessed from this object.

Pushing Messages

In Python, push should always be given a list of messages. In the example below, 5 messages are added, each of which is a number.

>>> q.push([1,2,3,4,5])

You can also specify a delay if you want the messages to only be visible to readers after delay seconds.

>>> q.push([6], delay=20)

Messages can be anything that’s serializable in Python using pickle including lists, dictionaries, and custom-defined Python Objects.

>>> q.push([{'secret_code': 'XYZ_ABC'}])

Warning

Message Retention Period

If a message is not popped from a queue within 4 days of being pushed, it is automatically purged.

Counting Messages

count is only an approximation due to the distributed nature of the queue.

>>> q.count()
0
>>> q.count()
6

Popping Messages

pop returns a list of messages popped from the queue. It blocks until at least one message is popped, or timeout is reached. pop takes two optional arguments:

  • max_count is the maximum number of messages that may be returned; valid ranges are 1 to 10 inclusive (default 10).
  • timeout controls the number of seconds (range 0 to 20 inclusive, default 20) to wait for a message to become available. If timeout is reached, an empty list is returned.
>>> q.pop(max_count=1)
[1]
>>> q.pop()
[5,3,2,4]

Warning

Do Not Use CTRL-C to Interrupt pop()

While CTRL-C (KeyboardInterrupt) will return you to your Python console, pop() will still run in the background to completion, potentially losing messages up to timeout seconds after.

Queue is not FIFO

Because of the distributed nature of the queue, you should make no assumptions regarding the order that messages are popped.

Acknowledgements

Use acknowledgements if you want your queue to hold on to a copy of a message it has popped, until you explicitly acknowledge that the message can be deleted. If an acknowledgement is not received before a deadline that you specify has been reached, then the message reappears in the queue, and will be re-popped by a reader.

Using acknowledgements is critical for guaranteeing that messages will not be lost in the event of a program crash, or server failure. If you are an advanced user who needs access to the acknowledgement interface, refer to our module documentation for cloud.queue.CloudQueue.pop_tickets() and cloud.queue.MessageTicket.

If you’re using an Attached Message Handler, our systems will automatically acknowledgement a message only after it’s been processed, thus guaranteeing that you won’t ever lose a message.

Attaching a Message Handler

When you attach a message handler to a queue, you’re attaching a function or command that will process every message in the queue. The function or command may run on more than one machine, depending on your configuration parameters. The output can optionally be passed to other queues; if no output queue is specified, the return value of your attachment is ignored.

To demonstrate, we’ll use an example of a message handler that doubles inputs from the numbers queue, and outputs to a queue named doubled.

digraph foo {
   rankdir=LR;
   node [shape=circle,fixedsize=false,width=0.8,fontsize=10]; "double()";

   node [shape=rect,color=black,style=filled,fillcolor=white,height=0.3]; numbers; doubled;
   "numbers" -> "double()";
   "double()" -> "doubled";
}

To create this pipeline in Python, do the following:

>>> def double(x):
...     return 2 * x
>>> numbers_q = cloud.queue.get('numbers')
>>> doubled_q = cloud.queue.get('doubled')
>>> numbers_q.attach(double, output_queues=[doubled_q])

Now, if you add numbers to the queue, you’ll see that they’ll be processed and put in the doubled queue.

>>> numbers_q.push([1,2,3,4,5,6,7,8,9,10])
>>> doubled_q.pop()
[2, 6, 14, 20]
>>> doubled_q.pop()
[4, 8, 18]

Note how there is no guaranteed ordering for numbers in the doubled queue, nor was there any ordering in the numbers queue.

What Can a Message Handler Do?

Like a job, a message handler can run any Python code. Here are some examples:

When using compiled Python packages, and non-Python programs, you may need to use an Environment.

Suppressing Output Messages

If your handler returns None for a given message, the return value is not pushed to any output queues.

Outputting Multiple Messages

By default, the return value of a handler will always be interpreted as a single message, even if the value is an iterable (e.g. a list). If you would like the iterable to be expanded into multiple messages, set the iter_output argument to True in attach. Note that with iter_output True, a None element is considered a valid message.

As an example, we’ll create a queue that should be filled with sentences. Our handler will generate a new message for every word in the sentence.

digraph foo {
   rankdir=LR;
   node [shape=circle,fixedsize=false,width=0.8,fontsize=10]; "split()";

   //node [shape=circle,color=white,style=filled,fillcolor=white]; sentences; words;
   node [shape=rect,color=black,style=filled,fillcolor=white,height=0.3]; sentences; words;
   "sentences" -> "split()";
   "split()" -> "words";
}

As you can see below, split() returns an iterable (a list) consisting of each word in sentence.

>>> def split(sentence):
...     return sentence.split()
>>> sentences_q = cloud.queue.get('sentences')
>>> words_q = cloud.queue.get('words')

If we keep iter_output set to False (the default), the return value will be a single message:

>>> sentences_q.attach(split, output_queues=words_q, iter_output=False)
>>> sentences_q.push(['this is a test'])
...
>>> words_q.count()
1
>>> words_q.pop(max_count=1)
[['this', 'is', 'a', 'test']]

Instead, we’ll set iter_output to True, ensuring that each word is a separate message.

>>> sentences_q.attach(split, output_queues=words_q, iter_output=True)
>>> sentences_q.push(['this is a test'])
...
>>> words_q.count()
4
>>> words_q.pop(max_count=1)
['this']
>>> words_q.pop(max_count=3)
['is', 'a', 'test']

Who Processes Messages?

If you look at your Job Dashboard, you’ll see a job was created in response to your push of numbers. Its label is set to “queue-{ queue name }”, which is “queue-numbers” in this case. This job is long-running, will use your attached function to continuously process messages until it does not get any new messages for 20 seconds, at which point it will gracefully finish.

Maintaining State

The attachments so far maintain zero state across multiple messages. But, what if you want to some persistence between messages, whether it’s an aggregate value, or a database connection?

To accomplish this, you can use an instance of a class as the message handler, rather than a function. The instance will need a message_handler() method that takes one argument, and operates just like the message handlers we’ve already discussed. Optionally, the instance can also have pre_handling() and post_handling() methods defined. pre_handling() will be called before any messages have been processed. post_handling() will be called when the job is finished processing messages.

Here’s an example that maintains a connection to a database. Assume each message is a product item. The handler outputs a new message that includes the item and price, which it has looked up from the database.

>>> class PriceExtractor(object):
...     def pre_handling(self):
...         self.db = connect_to_db()

...     def message_handler(self, item):
...          price = self.db.find({'item': item})
...          return {'item': item, 'price': price}

...     def post_handling(self):
...         self.db.disconnect()

>>> price_extractor = PriceExtractor()
>>> items_q = cloud.queue.get('items')
>>> price_q = cloud.queue.get('prices')
>>> itmes_q.attach(price_extractor, output_queues=[price_q])

Readers Per Job

By default, the job that is created to handle messages will only process one message at a time. If the job spends its time making network requests, or some other non-CPU-bound task, then you may benefit by having the job process multiple messages simultaneously.

In Python,

>>> q = cloud.queue.get('websites')
>>> out_q = cloud.queue.get('emails')
>>> q.attach(scrape_emails, output_queues=[out_q], readers_per_job=5)

The job that runs your handler creates readers_per_job subprocesses. The primary process pops messages from the input queue, and feeds it to free subprocesses. Each subprocess in turn has an independent message handler; if your message_handler is an instance (Maintaining State), one instance will be created per subprocess, and they will not share any state.

Warning

Readers Consume Memory

Each reader is a fork of the primary process, and will consume a minimum of 40MB. Choose a Core Type that has enough RAM for the number of readers you choose. Otherwise, jobs will error with memory allocation failures. Confusingly, these errors are dependent on when the Python garbage collector executes, so may not happen immediately.

Max Parallel Jobs

Set max_parallel_jobs to increase message processing throughput.

By default, only one job will be created when there are messages waiting in the queue for processing. For higher parallel processing capacity, you can set the maximum number of parallel jobs. We automatically scale the number of jobs from 0 to max_parallel_jobs based on the number of messages in the queue. For performance implications, see Attachment Throughput.

In Python,

>>> q = cloud.queue.get('websites')
>>> out_q = cloud.queue.get('emails')
>>> q.attach(scrape_emails, output_queues=[out_q], max_parallel_jobs=5)

You can use the Queue Web Dashboard to scale up and down the number of parallel jobs post-attachment.

Warning

Jobs won’t necessarily run immediately even if they’re created in response to new messages. To ensure responsiveness, you’ll need to use Realtime Cores.

Faster Message Processing

In addition to the above, many of the standard options available for job creation are available for attachments. This includes the ability to Choose a Core Type and Use Multicore.

>>> q = cloud.queue.get('images')
>>> output_q = cloud.queu.get('thumbnails')
>>> q.attach(generate_thumbnails,
             output_queues=output_q,
             readers_per_job=4,
             _type='f2',
             _cores=4)

Note

Beating the GIL with Readers Per Job.

Since each reader runs in a separate process, each reader can take advantage of a separate core simultaneously without GIL contention.

For other considerations, see Performance.

Error Handling in Attachments

If errors are encountered processing any message, the error will be printed to the standard error of the job and the job will continue processing messages. After the queue is exhausted, the job will be marked as errored and its exception set to the first error that was encountered. In the Queues Dashboard, click “view jobs” to see which jobs are linked to the queue of interest.

When dealing with a message_handler that is an instance (Maintaining State), this same policy of logging and ignoring errors applies to pre_handling and post_handling. Even if pre_handling fails, the message_handler will continue to be invoked until the queue is exhausted. Again, the job’s exception will be set to the first error that occurred.

Retrying

If you want to automatically retry messages that raise Exceptions while being processed, you should specify three keywords in cloud.queue.CloudQueue.attach():

  • retry_on – A list of Exceptions that are safe to retry.
  • retry_delay – The minimum number of seconds before a message should be retried.
  • max_retries – The maximum number of times a message should be retried.

For example:

>>> import urllib2
>>> def scrape_webpage(url):
...     return urllib2.urlopen(url)
>>> urls_q = cloud.queue.get('urls')
>>> webpages_q = cloud.queue.get('webpages')
>>> urls_q.attach(scrape_webpage,
                  output_queues=webpages_q,
                  retry_on=[urllib2.HTTPError, urllib2.URLError],
                  retry_delay=60,
                  max_retries=3)
>>> urls_q.push(['http://www.this-url-doesnt-work.com/'])

The url http://www.this-url-doesnt-work.com/ will be attempted 3 times with a 60 second delay between each attempt.

Explicit Retrying

To add retry logic directly into your function, just raise a cloud.queue.Retry Exception as follows:

1
2
3
4
def handler(msg, cur_retry):
    ...
    if should_retry:
        raise cloud.queue.Retry(delay=2**cur_retry, max_retries=3)

In addition, if your handler takes in a cur_retry argument, it’ll be set to a number indicating how many retries have been attempted. This makes it easy to implement an exponential backoff as shown above.

Exceptions

If a message raises an Exception, and you don’t want to simply retry, you can use on_error to route messages to queues based on the Exception.

on_error maps Exception classes to dictionaries describing the action to take. Augmenting the previous example, we can send all messages that raise an Exception to a bad-urls queue by routing based on the top-level Exception class.

>>> bad_urls_q = cloud.queue.get('bad-urls')
>>> urls_q.attach(scrape_webpage,
                  output_queues=webpages_q,
                  on_error={Exception: {'queue': bad_urls, 'delay': 0}},
                  retry_on=[urllib2.HTTPError, urllib2.URLError],
                  retry_delay=60,
                  max_retries=3)

Since retry_on is also specified, URLErrors will first trigger retries, until max_retries has been reached, at which point an error message will be pushed to bad_urls.

Error messages are of the following form:

1
2
3
4
5
{
 'source_message': 'http://www.this-url-doesnt-work.com/',
 'traceback': '',
 'cur_retry': 3,
}

Note

Exceptions Matching Multiple on_error Keys

When an error occurs, the on_error dictionary is checked. If a parent of the message_handler’s exception is found as a key in on_error, the error message is pushed to the mapped queue. If multiple parents match, the closest according to Python’s method resolution order is used.

Querying Queue Info

You can get relevant information about a queue using info.

In Python,

>>> q.info()
{u'batch_size': 10,
 u'count': 0,
 u'created': u'2013-01-31 01:40:22',
 u'func_name': u'__main__.double at <ipython-input-1-dbe0eff3dc71>:1',
 u'has_attachment': True,
 u'max_parallel_jobs': 1,
 u'name': u'numbers',
 u'output_queues': [u'doubled'],
 u'processing_jobs': 0,
 u'queued_jobs': 0,
 u'readers_per_job': 8}

Deleting a Queue

To remove a queue, deleting all messages within the queue, call delete:

>>> q.delete()

Note

After deleting a queue, you cannot create another with the same name for one minute.

Performance

Pop Overhead

Popping from a connection with low-latency to PiCloud takes around 65ms for a batch 10 messages.

Push Overhead

Depending on your connection, a single push may take as little as 60ms. Since a push can have a batch of messages, the amortized overhead per message can be less than 3ms.

Push to Pop Lag Time

If a pop is waiting on an empty queue, and an enqueue occurs, it takes on average 30ms for the pop to receive the message.

Attachment Throughput

Setting Max Parallel Jobs of 1 with a NOP handler (def f(x): pass), you should see a throughput of 150 messages per second. With sustained enqueuing, you should be able to get 150 messages per second per job. In other words, to see a maximum throughput of 1,500 message per second, you would set max_parallel_jobs to 10.

When using message handlers in high-throughput scenarios, the pop and push overheads are inconsequential to throughput for the following reason: as your handler processes messages, it simultaneously makes pop requests for more messages, and pushes results to output queues.

Limitations

Number of Queues

You can create an unlimited number of queues. However, we do not suggest creating many short-lived queues with attachments. The overhead of running an attachment job (plus the time it takes for the job to run if you don’t have Realtime Cores), and the 20s it waits for a message before tearing down gracefully, makes it generally inefficient to have thousands of queues each with attachments.

In general, try to consolidate queues into as few as you can. We suggest adding more complexity to messages, and having smart attachments that can handle similar classes of messages.

Size of Messages

A message can be of any size. However, if your message (when pickled) is larger than 60 kB, it will need to be stored as a reference to a Bucket object stored under the queue/ prefix. While the Queue transparently handles this situation, this indirection will result in significant performance reduction (5x for a given number of readers and jobs). Consequently, we highly recommend keeping messages under 60 kB.

Comparison with Job Framework

Our job system covered in the Primer lets you run any workload in the cloud, so why would you need Queues?

Short Jobs

If your job only takes a second to run, the overhead of submitting the job, running it, and then returning the result marginalizes the benefits of offloading to the cloud. For example, if you tried to process each message in a separate job, you’d be getting 5 jobs/second max on a single core.

Component Oriented

If your workload is made up of many discrete components that are effectively black boxes with respect to each other, and can sufficiently communicate through message passing, queues with attachments make it much easier to manage and update your workflow.

Message Passing

From user feedback, we’ve heard time and time again that getting jobs to communicate with each other is difficult. Queues abstract communication with messages, and make message passing as simple as returning values.

Multi-threading

For tasks that are bound by network I/O, queues make it easy to run multiple proceses in a single job without additional code.