Finding frequent items in a data stream
Posted: November 13th, 2009 | Author: Alex | Filed under: Algorithms, Data Mining, python | No Comments »In Finding the Frequent Items in Streams of Data [PDF], Graham Cormode and Marios Hadjieleftheriou discuss the frequent items problem and some of the algorithms that are used to solve it:
The frequent items problem is to process a stream of items and find all those which occur more than a given fraction of the time. It is one of the most heavily studied problems in mining data streams, dating back to the 1980s. Many other applications rely directly or indirectly on finding the frequent items, and implementations are in use in large scale industrial systems. In this paper, we describe the most important algorithms for this problem in a common framework. We place the different solutions in their historical context, and describe the connections between them, with the aim of clarifying some of the confusion that has surrounded their properties.
Some of the interesting bits here are that the data stream will easily contain millions (or billions) of items and the algorithm will typically only get to take one look at each item as it comes up in the stream.
Space-Saving
In this post I focus on the Space-Saving algorithm and provide an implementation in Python. The algorithm itself is originally described in Efficient Computation of Frequent and Top-k Elements in Data Streams [PDF] by Ahmed Metwally, Divyakant Agrawal, and Amr El Abbadi:
We propose an integrated approach for solving both problems of finding the most popular k elements, and finding frequent elements in a data stream. Our technique is efficient and exact if the alphabet under consideration is small. In the more practical large alphabet case, our solution is space efficient and reports both top-k and frequent elements with tight guarantees on errors. For general data distributions, our top-k algorithm can return a set of k’ elements, where k’ ≈ k, which are guaranteed to be the top-k’ elements; and we use minimal space for calculating frequent elements. For realistic Zipfian data, our space requirement for the frequent elements problem decreases dramatically with the parameter of the distribution; and for top-k queries, we ensure that only the top-k elements, in the correct order, are reported. Our experiments show significant space reductions with no loss in accuracy.
The algorithm basically works like this: The stream is processed one item at a time. A collection of k distinct items and their associated counters is maintained. If a new item is encountered and fewer than k items are in the collection, then the item is added and its counter is set to 1. If the item is already in the collection, its counter is increased by 1. If the item is not in the collection and the collection already has a size of k, then the item with lowest counter is removed and the new item is added, with its counter set to one larger than the previous minimum counter.
Here is some pseudo code to make this clearer:
SpaceSaving(k, stream):
collection = empty collection
for each element in stream:
if element in collection:
then collection[element] += 1
else if length of collection < k:
then add element to collection, collection[element] = 1
else:
current_minimum_element = element with lowest count value in collection
current_minimum = collection[current_minimum_element]
remove current_minimum_element from collection
collection[element] = current_minimum + 1The straightforward approach
A first, easy implementation would use a simple hashtable, such as in the following piece of code:
def space_saving_frequent_k1(k, stream, debug=False): def get_smallest_key(d): """ Given dictionary d, returns the key associated with the lowest value in the dictionary. """ min_key = None for key in d: if not min_key or d[key] < d[min_key]: min_key = key return min_key counters = {} for element in stream: if counters.has_key(element): counters[element] = counters[element] + 1 elif len(counters) < k: counters[element] = 1 else: current_minimum_key = get_smallest_key(counters) if current_minimum_key: counters[element] = counters[current_minimum_key] + 1 del counters[current_minimum_key] else: counters[element] = 1 return counters
This works for smaller data sets and particularly, when there is never a need to find that smallest element. Otherwise however, (repeatedly) retrieving the element with the minimum count remains a comparatively costly challenge.
Stream-Summary
When describing the Space-Saving algorithm, the authors also introduced the Stream-Summary data structure (inspired by work in Frequency Estimation of Internet Packet Streams with Limited Space [PDF]), which groups elements with equal values together (in buckets) and allows quick retrieval of the element with the lowest count.
Here is a diagram of this structure, using three buckets and a total of six elements (E1-E6).

Buckets are stored in a list sorted by the buckets’ respective values. Each bucket maintains knowledge of associated elements. Each element in turn maintains a pointer to its bucket. The latter is implemented using a simple hashtable. If an element’s count needs to be increased, the element is removed from its current bucket and added to the neighboring bucket with value one greater than the previous one. If no such bucket exists, it is inserted in the bucket list. Empty buckets are removed.
The Python implementation using the Stream-Summary data structure may then look like this:
class Bucket(object): def __init__(self, value = 1, elements = []): self.value = value self.elements = [] def __str__(self): return "%s: %s" % (self.value, str(self.elements)) def append(self, element): self.elements.append(element) def first_element(self): if self.elements: return self.elements[0] else: return None def has_elements(self): return len(self.elements) > 0 def remove(self, element): self.elements.remove(element) class StreamSummary(object): """ Maintains a dictionary of elements and a list of buckets. Each element points to a (parent) bucket. The bucket list is sorted based on the buckets' values. Each bucket also maintains a list of elments. This has the effect of grouping elements with equal values in buckets. """ def __init__(self): self.elements = {} self.buckets = [] def __len__(self): return len(self.elements.keys()) def __str__(self): result = "" for b in self.buckets: result += str(b) + " " return result def add_element(self, element): """ Adds an element and ensures it's assigned to the correct bucket. """ if not self.elements.has_key(element): if not self.buckets or self.buckets[0].value != 1: self.buckets.insert(0, Bucket()) self.elements[element] = self.buckets[0] self.buckets[0].elements.append(element) def increase_element(self, element): """ Increasing an element's value also means assigning it to the correct bucket. That can result in creating a new bucket and/or removing an empty one. """ current_bucket = self.elements[element] bucket_index = self.buckets.index(current_bucket) if len(self.buckets) == bucket_index + 1: self.buckets.append(Bucket(value = current_bucket.value + 1)) elif self.buckets[bucket_index + 1].value > current_bucket.value + 1: self.buckets.insert(bucket_index + 1, Bucket(value = current_bucket.value + 1)) current_bucket.remove(element) self.elements[element] = self.buckets[bucket_index + 1] if not current_bucket.has_elements(): del self.buckets[bucket_index] self.elements[element].append(element) def has_element(self, element): return self.elements.has_key(element) def get_minimum(self): if self.buckets: return self.buckets[0].first_element() else: return None def replace_element(self, old_element, new_element): """ Replaces an existing element with an entirely new element in the old element's bucket. """ self.elements[new_element] = self.elements[old_element] self.elements[new_element].remove(old_element) self.elements[new_element].append(new_element) del self.elements[old_element] def space_saving_frequent_k(k, stream): summary = StreamSummary() for element in stream: if summary.has_element(element): summary.increase_element(element) elif len(summary) < k: summary.add_element(element) else: current_minimum_key = summary.get_minimum() if current_minimum_key: summary.replace_element(current_minimum_key, element) summary.increase_element(element) else: summary.add_element(element) return summary
For larger data sets, where k is noticeably smaller than the number of distinct elements in the set, the Stream-Summary data structure proves advantageous.
Onward
There is a lot of ongoing research in this problem area. This article is clearly just barely offering a small (and simplified) glimpse. Explore the research. Find out what real-world applications use some version of this as part of their problem solving approach. Applications can be found in web access log processing, search applications, mining of real-time message streams, and so forth.
Leave a Reply