Mapping Tips & Tricks

This article shows some common design patterns when using cloud.map().

Holding an Argument Constant

Sometimes you have a function that takes in multiple arguments, but you only want to map a list of values to one of the arguments.

For instance, you may have an add function:

>>> def add(x, y):
...    return x + y

And you want to set x=5, but map across many values for y (0, 1, 2):

  • add(5,0)
  • add(5,1)
  • add(5,2)

The easiest way is to make a new function, add_wrapper that wraps add and sets x to 5.

1
2
3
4
def add_wrapper(y):
    return add(5, y)

cloud.map(add_wrapper, [0, 1, 2])

This cloud.map creates 3 jobs which functionally accomplish add(5,0), add(5,1), and add(5,2). For brevity, you can use an anonymous closure:

>>> cloud.map(lambda y: add(5, y), [0, 1, 2])

If you’re looking for more flexibility, see Python closures.

Warning

Avoid functools.partial

Since cloud.map applies your map arguments as positional arguments, they may override arguments specified by partial.

Creating Identical Jobs

For certain use-cases, such as randomized simulations and load testing, you may wish to create multiple jobs with the exact same arguments. Let’s assume you have a function f that takes 3 arguments.

def f(x, y, z):
    pass

You want to execute f(0, 1, 2) on the cloud 3 times in a map. The manual way to do this is to write:

cloud.map(f, [0, 0, 0], [1, 1, 1], [2, 2, 2])

To do this generically for any number of arguments, and for as many jobs as you need, N, use the following:

>>> args = [0, 1, 2]
>>> N = 3
>>> cloud.map(f, *zip(*([args] * N)))

Argument Chunking

Note

Don’t Want To Explicitly Chunk? See Queues.

While a different paradigm, Queues have the advantage of automatic chunking, and an amortized overhead of less than 50ms per effective job.

As explained in Characteristics of an Ideal Job, an job should take at least a second to run; the longer your jobs the less impactful the overhead of our system. If each job in your map runs for a short amount of time, you can chunk these jobs together to increase the length of time each job runs.

For example, if one call to f(x) takes 0.2s, you could merge the workloads of 50 jobs together so that a single new job invokes f(x) 50 times, for a total runtime of 10s.

To do this, you can use the chunk_list() and function_dechunker() functions we’ve written for you to convert the following:

# args could be a list of 10,000 values
>>> jids = cloud.map(f, args)
>>> results = cloud.result(jids)

To use chunking:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
chunk_size = 50

def chunk_list(lst, chunk_size):
    """Split a list into multiple lists of length chunk_size"""
    return [lst[i:i+chunk_size] for i in range(0, len(lst), chunk_size)]

def function_dechunker(func):
    """Return a function that processes ``func`` over a list of elements"""
    def wrapper_inner(chunked_args):
       # Regular Python map runs serially within job
       return map(func, chunked_args)
    return wrapper_inner

# generate len(args)/chunk_size jobs
jids = cloud.map(function_dechunker(foo), chunk_list(args, chunk_size))

# a list of lists is returned
chunked_results = cloud.result(jids)

# merge lists to mimic non-chunked example
results = list(itertools.chain.from_iterable(chunked_results))

Note

This code only works if args is a list. If your arguments are generic iterables which cannot be cast to a list due to memory constraints, consider writing your own generator function in lieu of using chunk_list.

Note

Within wrapper_inner, instead of the serial map, you can do a parallel local map with cloud.mp.map() and cloud.mp.result().