Lazy merging in Python using streams

By
Posted on
Tags: , , , , ,

Recently while solving a programming puzzle in Python, I needed to merge a series of N iterators, each yielding values in sorted order, into a single iterator over the sorted values. The trick is that, when asked for a value from the merged series, you must extract all N iterators’ next values to determine which is the smallest. And then, of course, you can emit only that one value. So what do you do with the remaining N – 1 values you’ve extracted?

Instead of trying to find some place to store them, perhaps it would be better to avoid the problem altogether by not extracting more values than we are prepared to emit. This we can do by converting the iterators into an equivalent form in which the next value is always exposed and hence available for making decisions before extraction. This form is basically the stream of SICP fame.

The idea is to convert each Python iterator into either None (representing an empty stream) or a pair containing the iterator’s next value and the iterator itself:

def iterator_to_stream(iterator):
    """Convert an iterator into a stream (None if the iterator is empty)."""
    try:
        return iterator.next(), iterator
    except StopIteration:
        return None

Then to extract values from the stream, you just apply stream_next to it, and it will hand you back the next value and the updated state of the stream:

def stream_next(stream):
    """Get (next_value, next_stream) from a stream."""
    val, iterator = stream
    return val, iterator_to_stream(iterator)

Since streams expose their next value, they can be ordered by that value. And for my task that was the property that made all the difference:

import heapq

def merge(iterators):
    """Make a lazy sorted iterator that merges lazy sorted iterators."""
    streams = map(iterator_to_stream, map(iter, iterators))
    heapq.heapify(streams)
    while streams:
        stream = heapq.heappop(streams)
        if stream is not None:
            val, stream = stream_next(stream)
            heapq.heappush(streams, stream)
            yield val

An example use:

>>> xs = merge([xrange(3), xrange(2, 9), xrange(5)])
>>> xs
<generator object merge at 0x7fea07c9d320>

>>> list(xs)
[0, 0, 1, 1, 2, 2, 2, 3, 3, 4, 4, 5, 6, 7, 8]