Group Actions and Hashing Unordered Multisets

I learned of a neat result due to Kevin Ventullo that uses group actions to study the structure of hash functions for unordered sets and multisets. This piqued my interest because a while back a colleague asked me if I could think of any applications of “pure” group theory to practical computer programming that were not cryptographic in nature. He meant, not including rings, fields, or vector spaces whose definitions happen to be groups when you forget the extra structure. I couldn’t think of any at the time, and years later Kevin has provided me with a tidy example. I took the liberty of rephrasing his argument to be a bit simpler (I hope), but I encourage you to read Kevin’s post to see the reasoning in its original form.

Hashes are useful in programming, and occasionally one wants to hash an unordered set or multiset in such a way that the hash does not depend on the order the elements were added to the set. Since collection types are usually generic, one often has to craft a hash function for a set<T> or multiset<T> that relies on a hash function hash(t) of an unknown type T. To make things more efficient, rather than requiring one to iterate over the entire set each time a hash is computed, one might seek out a hash function that can be incrementally updated as new elements are added, and provably does not depend on the order of insertion.

For example, having a starting hash of zero, and adding hashes of elements as they are added (modulo $ 2^{64}$) has this incremental order-ignorant property, because addition is commutative and sums can be grouped. XOR-ing the bits of the hashes is similar. However, both of these strategies have downsides.

For example, if you adopt the XOR strategy for a multiset hash, then any element that has an even quantity in the multiset will be the same as if it were not in the set at all (or if it were in the set with some other even quantity). This is because x XOR x == 0. On the other hand, if you use the addition approach, if an element hashes to zero, its inclusion in any set has no effect on the hash. In Java the integer hash is the identity, so zero would be undetectable as a member of a multiset of ints in any quantity. Less drastically, a multiset with all even counts of elements will always hash to a multiple of 2, and this makes it easier to find hash collisions.

A natural question arises: given the constraint that the hash function is accumulative and commutative, can we avoid such degenerate situations? In principle the answer should obviously be no, just by counting. I.e., the set of all unordered sets of 64-bit integers has size $ 2^{2^{64}}$, while the set of 64-bit hashes has size merely $ 2^{64}$. You will have many many hash collisions, and would need a much longer hash to avoid them in principle.

More than just “no,” we can characterize the structure of such hash functions. They impose an abelian group structure on the set of hashes. And due to the classification theorem of finite abelian groups, up to isomorphism (and for 64-bit hashes) that structure consists of addition on blocks of bits with various power-of-2 moduli, and the blocks are XOR’ed together at the end.

To give more detail, we need to review some group theory, write down the formal properties of an accumulative hash function, and prove the structure theorem.

Group actions, a reminder

This post will assume basic familiarity with group theory as covered previously on this blog. Basically, this introductory post defining groups and actions, and this followup post describing the classification theorem for commutative (abelian) groups. But I will quickly review group actions since they take center stage today.

A group $ G$ defines some abstract notion of symmetries in a way that’s invertible. But a group is really meaningless by itself. They’re only useful when they “act” upon a set. For example, a group of symmetries of the square acts upon the square to actually rotate its points. When you have multiple group structures to consider, it makes sense to more formally separate the group structure from the set.

So a group action is formally a triple of a group $ G$, a set $ X$, and a homomorphism $ f:G \to S_X$, where $ S_X$ is the permutation group (or symmetry group) of $ X$, i.e., the set of all bijections $ X \to X$. The permutation group of a set encompasses every possible group that can act on $ X$. In other words, every group is a subgroup of a permutation group. In our case, $ G$ and $ f$ define a subgroup of symmetries on $ X$ via the image of $ f$. If $ f$ is not injective, some of the structure in $ G$ is lost. The homomorphism determines which parts of $ G$ are kept and how they show up in the codomain. The first isomorphism theorem says how: $ G / \textup{ker} f \cong \textup{im} f$.

This relates to our hashing topic because an accumulative hash—and a nicely behaved hash, as we’ll make formal shortly—creates a group action on the set of possible hash values. The image of that group action is the “group structure” imposed by the hash function, and the accumulation function defines the group operation in that setting.

Multisets as a group, and nice hash functions

An appropriate generalization of multisets whose elements come from a base set $ X$ forms a group. This generalization views a multiset as a “counting function” $ T: X \to \mathbb{Z}$. The empty set is the function that assigns zero to each element. A positive value of $ k$ implies the entry shows up in the multiset $ k$ times. And a negative value is like membership “debt,” which is how we represent inverses, or equivalently set difference operations. The inverse of a multiset $ T$, denoted $ -T$, is the multiset with all counts negated elementwise. Since integer-valued functions generally form a group under point-wise addition, these multisets also do. Call this group $ \textup{MSet}(X)$. We will freely use the suggestive notation $ T \cup \{ x \} $ to denote the addition of $ T$ and the function that is 1 on $ x$ and 0 elsewhere. Similarly for $ T – \{ x \}$.

$ \textup{MSet}(X)$ is isomorphic to the free abelian group on $ X$ (because an instance of a multiset only has finitely many members). Now we can define a hash function with three pieces:

  • An arbitrary base hash function $ \textup{hash}: X \to \mathbb{Z} / 2^n \mathbb{Z}$.
  • An arbitrary hash accumulator $ \textup{h}: \mathbb{Z} / 2^n \mathbb{Z} \times \mathbb{Z} / 2^n \mathbb{Z} \to \mathbb{Z} / 2^n \mathbb{Z}$
  • A seed, i.e., a choice for the hash of the empty multiset $ s \in \mathbb{Z} / 2^n \mathbb{Z}$

With these three data we want to define a multiset hash function $ h^*: \textup{MSet}(X) \to \mathbb{Z} / 2^n \mathbb{Z}$ recursively as

  • $ h^*(\left \{ \right \}) = s$
  • $ h^*(T \cup \left \{ x \right \}) = h(h^*(T), \textup{hash}(x))$
  • $ h^*(T – \left \{ x \right \}) = \dots$

In order for the second bullet to lead to a well-defined hash, we need the property that the accumulation order of individual elements does not matter. Call a hash accumulator commutative if, for all $ a, b, c \in \mathbb{Z} / 2^n \mathbb{Z}$,

$ \displaystyle h(h(a,b), c) = h(h(a,c), b)$

This extends naturally to being able to reorder any sequence of hashes being accumulated.

The third is a bit more complicated. We need to be able to use the accumulator to “de-accumulate” the hash of an element $ x$, even when the set that gave rise to that hash didn’t have $ x$ in it to start.

Call a hash accumulator invertible if for a fixed hash $ z = \textup{hash}(x)$, the map $ a \mapsto h(a, z)$ is a bijection. That is, accumulating the hash $ z$ to two sets with different hashes under $ h^*$ will not cause a hash collision. This defines the existence of an inverse map (even if it’s not easily computable). This allows us to finish the third bullet point.

  • Fix $ z = \textup{hash}(x)$ and let $ g$ be the inverse of the map $ a \mapsto h(a, z)$. Then $ h^*(T – \left \{ x \right \}) = g(h^*(T))$

Though we don’t have a specific definition for the inverse above, we don’t need it because we’re just aiming to characterize the structure of this soon-to-emerge group action. Though, in all likelihood, if you implement a hash for a multiset, it should support incremental hash updates when removing elements, and that formula would apply here.

This gives us the well-definition of $ h^*$. Commutativity allows us to define $ h^*(T \cup S)$ by decomposing $ S$ arbitrarily into its constituent elements (with multiplicity), and applying $ h^*(T \cup \{ x \})$ or $ h^*(T – \{ x \})$ in any order.

A group action emerges

Now we have a well-defined hash function on a free abelian group.

$ \displaystyle h^*: \textup{MSet}(X) \to \mathbb{Z} / 2^n \mathbb{Z}$

However, $ h^*$ is not a homomorphism. There’s no reason hash accumulation needs to mesh well with addition of hashes. Instead, the family of operations “combine a hash with the hash of some fixed set” defines a group action on the set of hashes. Let’s suppose for simplicity that $ h^*$ is surjective, i.e., that every hash value can be achieved as the hash of some set. Kevin’s post gives the more nuanced case when this fails, and in that case you work within $ S_{\textup{im}(h^*)}$ instead of all of $ S_{\mathbb{Z} / 2^n \mathbb{Z}}$.

The group action is defined formally as a homomorphism

$ \displaystyle \varphi : \textup{MSet}(X) \to S_{\mathbb{Z} / 2^n \mathbb{Z}}$

where $ \varphi(T)$ is the permutation $ a \mapsto h(a, h^*(T))$. Equivalently, we start from $ a$, pick some set $ S$ with $ h^*(S) = a$, and output $ h^*(T \cup S)$.

The map $ \varphi$ is a homomorphism. The composition of two accumulations is order-independent because $ h$ is commutative. This is how we view $ h$ as “the binary operation” in $ \textup{im} \varphi$, because combining two permutations $ a \mapsto h(a, h^*(T))$ and $ a \mapsto h(a, h^*(S))$ is the permutation $ a \mapsto h(a, h^*(S \cup T))$.

And now we can apply the first isomorphism theorem, that

$ \displaystyle \textup{MSet}(X) / \textup{ker} \varphi \cong \textup{im} \varphi \subset S_{\mathbb{Z} / 2^n \mathbb{Z}}$

This is significant because any quotient of an abelian group is abelian, and this quotient is finite because $ S_{\mathbb{Z} / 2^n \mathbb{Z}}$ is finite. This means that the group $ \textup{im} \varphi$ is isomorphic to

$ \displaystyle \textup{im} \varphi \cong \bigoplus_{i=1}^k \mathbb{Z}/2^{n_i} \mathbb{Z}$

where $ n = \sum_i n_i$, and where the operation in each component is the usual addition modulo $ n_i$. The $ i$-th summand corresponds to a block of $ n_i$ bits of the hash, and within that block the operation is addition modulo $ 2^{n_i}$. Here the “block” structure is where XOR comes in. Each block can be viewed as a bitmask with zeros outside the block, and two members are XOR’ed together, which allows the operations to apply to each block independently.

For example, the group might be $ \mathbb{Z} / 2^{4} \mathbb{Z} \times \mathbb{Z} / 2^{26} \mathbb{Z}\times \mathbb{Z} / 2^{2} \mathbb{Z}$ for a 32-bit hash. The first block corresponds to 32-bit unsigned integers whose top 4 bits may be set but all other bits are zero. Addition is done within those four bits modulo 16, leaving the other bits unchanged. Likewise, the second component has the top four bits zero and the bottom two bits zero, but the remaining 26 bits are summed mod $ 2^{24}$. XOR combines the bits from different blocks.

In one extreme case, you only have one block, and your group is just $ \mathbb{Z} / 2^n \mathbb{Z}$ and the usual addition combines hashes. In the other extreme, each bit is its own block, your group is $ (\mathbb{Z} / 2 \mathbb{Z})^n$, the operation is a bitwise XOR.

Note, if instead of $ 2^n$ we used a hash of some other length $ m$, then in the direct sum decomposition above, $ m$ would be the product of the sizes of the components. The choice $ m = 2^n$ maximizes the number of different structures you can have.

Implications for hash function designers

Here’s the takeaway.

First, if you’re trying to design a hash function that avoids the degeneracies mentioned at the beginning of this article, then it will have to break one of the properties listed. This could happen, say, by maintaining additional state.

Second, if you’re resigned to use a commutative, invertible, accumulative hash, then you might as well make this forced structure explicit, and just pick the block structure you want to use in advance. Since no clever bit shifting will allow you to outrun this theorem, you might as well make it simple.

Until next time!

Searching for RH Counterexamples — Scaling Up

We’re ironically searching for counterexamples to the Riemann Hypothesis.

  1. Setting up Pytest
  2. Adding a Database
  3. Search Strategies
  4. Unbounded integers
  5. Deploying with Docker
  6. Performance Profiling

Last time we made the audacious choice to remove primary keys from the RiemannDivisorSums table for performance reasons. To help with that, we will do two things in this post

  • Reduce the storage footprint of the whole application (it was 60 GiB when it crashed, and we got up to 84 prime factors).
  • Refactor the application into a worker architecture.

The first one is straightforward, but uses a possibly new idea to the mathematician, and the second is a relatively major rearchitecture.

Hashing a search block

Instead of storing every single Riemann divisor sum, we can just store the ones that are interesting, that is, the ones that are larger than some threshold—in our case, 1.767, which had just under a thousand examples in the last post.

That raises a problem. We want to be able to say that we checked all examples up to some limit. We’ve been saying “up to all numbers having at most K prime factors” because that’s how the superabundant search strategy enumerates the candidates. If we don’t store all the witness values, how can we prove to a skeptical observer that we checked what we said we checked?

One approach is to instead store a “summary string” derived from the full set of witness values. This “summary string” should be deterministically reproducible, so that if someone has the metadata about the block of numbers, they can recompute the summary and compare it with our summary.

A simple example summary might be the string-concatenation of all the witness values one after another, like 1.4323,1.5678,0.8792. But that still takes up a lot of space (each block had 250,000 different witness values). It would be better if we could find a summary that had a fixed size, regardless of how big the input is. Taking a cue from computer science, a hash function does exactly that. Hash functions aren’t perfect, they can have collisions, but the collisions are unlikely enough that, if you tested a hundred random blocks and all your hashes agreed with all of my hashes, you would be confident I checked everything the same as you. That gives you confidence that the rest of my blocks (say, a million of them) were computed correctly.

The hash function we’ll use is called sha256, and we add it in this pull request. Now the database application shouldn’t need a ton of disk space to run. (We’ll confirm this later in the article)

For more background and related topics around hash functions, see Load Balancing and the Power of Hashing and Hashing to Estimate the Size of a Stream.

Worker architecture

Next we want to rearchitect the application to enable two things: better protection against the possibility of duplicating/skipping divisor sum computations, and allowing us to scale up the rate of computation by adding additional computers to the mix. This pull request achieves the goal, but note that it’s a large change, so let’s cover the broad ideas, then zoom in on a few details, and finally discuss the PR’s structure as a whole from the perspective of software engineering.

We start by adding a “state” field to a search block. The state can be one of NOT_STARTED, IN_PROGRESS, FINISHED, FAILED. Then one worker job (read, container) will be responsible for computing new blocks in the NOT_STARTED state, and a group of workers jobs will continually “claim” an eligible block, compute the divisor sums, and then mark it as finished and insert the interesting witness values it found.

This suggests a drastic change to the database interface, as shown in this commit, to align with the new “claim/finish” workflow. We replace the existing functions with insert_search_blocks, claim_next_search_block, and finish_search_block. The first two operate only on the SearchMetadata table, while the third couples together the changes to both tables, by marking a block as finished and inserting the relevant witness values. We’ll see how we ensure these two updates occur atomically in a moment. Also, because of the new coupling of the two tables, I merged the two database interfaces into one.

We need two safety measures to make this work. First, we need to ensure that we don’t accidentally generate multiple duplicate search blocks. We do this by adding a uniqueness constraint on the bounds of a search block at the database level. If a client calls insert_search_blocks with something that’s already in the database, the database itself will reject it, and the client will be forced to catch the failure and recover.

Second, we need to ensure that two workers don’t claim the same search block. This involves a bit of database wizardry. I had to do a bit of research to make it work, and it took a few attempts. The gist of the problem is that if you want to claim a search block, you need to do two queries. First, you need to look up the oldest eligible search block that you want to claim. Then, you need to mark it as claimed. But if you have multiple workers accessing the database simultaneously, you can get this order of operations

  1. Worker 1 runs lookup query, gets search block 7
  2. Worker 2 runs lookup query, also gets search block 7
  3. Worker 1 runs update query, marks search block 7 as claimed
  4. Worker 2 runs update query, also marks search block 7 as claimed.

This can even happen if there’s only “one” query, and the lookup step is a subquery. After reading a bit about PostgreSQL’s transaction isolation guarantees, I learned that you can “lock” the rows read by the subquery until the update step is complete by adding FOR UPDATE to the subquery. This commit has the full query, but it looks something like this

UPDATE SearchMetadata
SET state = 'IN_PROGRESS'
FROM (
  SELECT starting_search_index
  FROM SearchMetadata
  WHERE state = 'NOT_STARTED'
  ORDER BY creation_time ASC
  LIMIT 1
  FOR UPDATE         <---- key clause!
) as m
WHERE
  starting_search_index = m.starting_search_index;

I also added some special tests (that use the finished worker jobs) that will specifically fail when this FOR UPDATE clause is removed, to confirm it fixes the behavior. This is particularly important because it’s hard to test database queries that protect against race conditions. Plus, I tried other approaches and got it wrong, which proved the value of having such tests.

Finally, the finish_search_block function needs to ensure that the update of the SearchMetadata and RiemannDivisorSums tables happen atomically. This is done in this commit by leveraging the concept of a database transaction. In a transaction, all of the queries between BEGIN and COMMIT happen at once or not at all, e.g., in the case that a later query fails. The “not at all” case is called a rollback.

Thankfully, the psycopg2 library we’re using uses transactions by default, and requires us to call connection.commit to make any query persist. So we are already using transactions and get the guarantee without extra work.

Next, we needed to update the SearchStrategy interface to separate the task of generating new search blocks and processing blocks, since these will be done by separate workers. This commit updates the interface, this commit the tests, and this commit the implementation.

Finally, once all the rearchitecting and re-testing is done, we ditched our old populate_database entry script, and replaced it with a worker script for generating new search blocks, and one for claiming and processing blocks. These have the same general structure: infinitely loop, use the database and search strategy interfaces as expected, and then exponentially back off when failures occur, eventually quitting if there are too many repeated failures. In the case of the generator worker, we also have configuration around how many new blocks to create, what minimum threshold to trigger doing work, and a delay between checks (since the workers will be much slower than the generator).

Ship it!

Now we’re ready to launch it, and this time since we have improved our memory issues, we can use an AWS instance with smaller memory requirements, but launch four of them to allow one for each container. After a few of the following steps, this worked out just fine:

  • Configure an AWS security group (which limits which IP addresses EC2 instances can communicate with) so that Postgres communication was allowed between all the launched instances. This was made easy by the fact that you can configure a security group once so that it allows communication to and from any other instances with the same security group (in addition to my laptop’s IP).
  • Keep track of the ip addresses of each of the instances I launched, and make sure that the instance with a 60 GiB disk is the instance I put the database on.
  • Manually install docker and launch the right container on each one, and point their PGHOST variables to the address of the database instance.

I will improve this workflow in the next article, but for now I am letting it run for a few days and observing that it works. The generator job adds 100 new search blocks any time there are less than 100 eligible blocks, and this query shows they are getting worked on properly.

divisor=# select state, count(*) from searchmetadata group by state;
    state    | count 
-------------+-------
 NOT_STARTED |   129
 FINISHED    |   367
 IN_PROGRESS |     4

As I was stopping and restarting jobs, two search blocks got stuck in the “in_progress” state, as you can see from the lagging-behind start time.

divisor=# select start_time, starting_search_index, ending_search_index from searchmetadata where state='IN_PROGRESS' order by start_time asc;
         start_time         | starting_search_index | ending_search_index 
----------------------------+-----------------------+---------------------
 2021-02-09 04:46:13.616085 | 64,433492             | 64,683491    <-- stuck!
 2021-02-09 04:48:08.554847 | 65,191862             | 65,441861    <-- stuck!
 2021-02-09 14:22:08.331754 | 78,9803652            | 78,10053651
 2021-02-09 14:22:44.36657  | 78,10053652           | 78,10303651
(4 rows)

This suggests I should create a new job for cleaning up “stale” IN_PROGRESS blocks and marking them as failed. I’ll do this in the next article, and it won’t hurt us to leave some blocks in the transient state, because once the cleanup job is running it will make those blocks eligible to be worked on again, and the worker’s claim the earliest block first.

I also noticed an issue where I forgot a commit statement, which didn’t show up in testing, and I can’t quite figure out how to set up a test that requires this commit statement to be present. Nevertheless, after I noticed the problem, I patched it, stopped the containers, ran git pull on each instance, rebuilt the containers, and started them again, and it continued along as if nothing was wrong.

Since the search application is running well now for long periods of time, I’ll let it run for a week or two (or until something breaks), and return to it in the next article to see what bigger witness values we find. In the mean time, I’d like to discuss how I organized all of the changes that went into that massive pull request.

How to organize a big pull request

The pull request to change the application architecture is quite substantial, and I had a lot of cleanup I wanted to do along the way.

The basic pattern of the PR, which was repeated a few times throughout, was to split each unit of change into three parts: update the interface, update the tests that use the interface, and update the implementation. As an example, one commit updates the SearchStrategy interface, one the tests, and one the implementation.

Doing it this way helps in a few ways. First, I think a lot about how the interface should work, and I prove that it feels right by writing tests first. That way, I don’t get bogged down in the implementation details when thinking about how it will be used. Likewise, when doing this for the database change, after updating the interface and tests, I first updated the InMemoryDatabase implementation, because it was simpler, and having this made me confident the interface could be implemented decently, and allowed me to tweak the interface early, before turning to the harder problem of figuring out PostgreSQL FOR UPDATE stuff.

Much like the proof of a mathematical theorem, the final commits hide the work that went into this, and commit messages, code comments, and PR description provide a deeper explanation. Typically what I do is make changes, and then use git’s staging area concept to pull out different sub-changes within a file into a single commit. This blog post about “using git well” covers this in more detail. But this, and git’s rebase and amend features, allowed me to edit older commits so that the final pull request lays out the commits in a logical order.

It also allows me to split off the cleanup parts as best as possible, such as this commit to rename SearchState to SearchIndex (after I added the new “state” field). I made the commit, had some places I missed and fixed later, extracted them into their own commit, and rebased and squashed them together to unify the change. Keeping each commit as close to an atomic thought as possible (“add this test” or “implement this method”) enables easy squashing and reordering.

Though nobody is reviewing my code now (except you, dear reader, after the fact), a primary benefit of all this cleanup is that the code is much easier to review. The pull request as a whole is a feature, each commit is a smaller, but comprehensible piece of that bigger feature, and the commits are laid out in such a way that if you browse the commits in order, you get a clear picture of how the whole feature was assembled.

If you find yourself doing mathematical work in the context of software, you should expect to spend a lot of time reviewing code and having your code reviewed. The extra effort you put into making that process easier pays off by producing better feedback, uncovering more bugs, and reducing total review time. Though it might be considered a soft skill, smooth code reviews are a critical part of a well-functioning software organization.

Hashing to Estimate the Size of a Stream

Problem: Estimate the number of distinct items in a data stream that is too large to fit in memory.

Solution: (in python)

import random

def randomHash(modulus):
   a, b = random.randint(0,modulus-1), random.randint(0,modulus-1)
   def f(x):
      return (a*x + b) % modulus
   return f

def average(L):
   return sum(L) / len(L)

def numDistinctElements(stream, numParallelHashes=10):
   modulus = 2**20
   hashes = [randomHash(modulus) for _ in range(numParallelHashes)]
   minima = [modulus] * numParallelHashes
   currentEstimate = 0

   for i in stream:
      hashValues = [h(i) for h in hashes]
      for i, newValue in enumerate(hashValues):
         if newValue &lt; minima[i]:
            minima[i] = newValue

      currentEstimate = modulus / average(minima)

      yield currentEstimate

Discussion: The technique used here is to use random hash functions. The central idea is the same as the general principle presented in our recent post on hashing for load balancing. In particular, if you have an algorithm that works under the assumption that the data is uniformly random, then the same algorithm will work (up to a good approximation) if you process the data through a randomly chosen hash function.

So if we assume the data in the stream consists of $ N$ uniformly random real numbers between zero and one, what we would do is the following. Maintain a single number $ x_{\textup{min}}$ representing the minimum element in the list, and update it every time we encounter a smaller number in the stream. A simple probability calculation or an argument by symmetry shows that the expected value of the minimum is $ 1/(N+1)$. So your estimate would be $ 1/(x_{\textup{min}}+1)$. (The extra +1 does not change much as we’ll see.) One can spend some time thinking about the variance of this estimate (indeed, our earlier post is great guidance for how such a calculation would work), but since the data is not random we need to do more work. If the elements are actually integers between zero and $ k$, then this estimate can be scaled by $ k$ and everything basically works out the same.

Processing the data through a hash function $ h$ chosen randomly from a 2-universal family (and we proved in the aforementioned post that this modulus thing is 2-universal) makes the outputs “essentially random” enough to have the above technique work with some small loss in accuracy. And to reduce variance, you can process the stream in parallel with many random hash functions. This rough sketch results in the code above. Indeed, before I state a formal theorem, let’s see the above code in action. First on truly random data:

S = [random.randint(1,2**20) for _ in range(10000)]

for k in range(10,301,10):
   for est in numDistinctElements(S, k):
      pass
   print(abs(est))

# output
18299.75567190227
7940.7497160166595
12034.154552410098
12387.19432959244
15205.56844547564
8409.913113220158
8057.99978043693
9987.627098464103
10313.862295081966
9084.872639057356
10952.745228373375
10360.569781803211
11022.469475216301
9741.250165892501
11474.896038520465
10538.452261306533
10068.793492995934
10100.266495424627
9780.532155130093
8806.382800033594
10354.11482578643
10001.59202254498
10623.87031408308
9400.404915767062
10710.246772348424
10210.087633885101
9943.64709187974
10459.610972568578
10159.60175069326
9213.120899718839

As you can see the output is never off by more than a factor of 2. Now with “adversarial data.”

S = range(10000) #[random.randint(1,2**20) for _ in range(10000)]

for k in range(10,301,10):
   for est in numDistinctElements(S, k):
      pass
   print(abs(est))

# output

12192.744186046511
15935.80547112462
10167.188106011634
12977.425742574258
6454.364151175674
7405.197740112994
11247.367453263867
4261.854392115023
8453.228233608026
7706.717624577393
7582.891328643745
5152.918628936483
1996.9365093316926
8319.20208545846
3259.0787592465967
6812.252720480753
4975.796789951151
8456.258064516129
8851.10133724288
7317.348220516398
10527.871485943775
3999.76974425661
3696.2999065091117
8308.843106180666
6740.999794281012
8468.603733730935
5728.532232608959
5822.072220349402
6382.349459544548
8734.008940222673

The estimates here are off by a factor of up to 5, and this estimate seems to get better as the number of hash functions used increases. The formal theorem is this:

Theorem: If $ S$ is the set of distinct items in the stream and $ n = |S|$ and $ m > 100 n$, then with probability at least 2/3 the estimate $ m / x_{\textup{min}}$ is between $ n/6$ and $ 6n$.

We omit the proof (see below for references and better methods). As a quick analysis, since we’re only storing a constant number of integers at any given step, the algorithm has space requirement $ O(\log m) = O(\log n)$, and each step takes time polynomial in $ \log(m)$ to update in each step (since we have to compute multiplication and modulus of $ m$).

This method is just the first ripple in a lake of research on this topic. The general area is called “streaming algorithms,” or “sublinear algorithms.” This particular problem, called cardinality estimation, is related to a family of problems called estimating frequency moments. The literature gets pretty involved in the various tradeoffs between space requirements and processing time per stream element.

As far as estimating cardinality goes, the first major results were due to Flajolet and Martin in 1983, where they provided a slightly more involved version of the above algorithm, which uses logarithmic space.

Later revisions to the algorithm (2003) got the space requirement down to $ O(\log \log n)$, which is exponentially better than our solution. And further tweaks and analysis improved the variance bounds to something like a multiplicative factor of $ \sqrt{m}$. This is called the HyperLogLog algorithm, and it has been tested in practice at Google.

Finally, a theoretically optimal algorithm (achieving an arbitrarily good estimate with logarithmic space) was presented and analyzed by Kane et al in 2010.

Load Balancing and the Power of Hashing

Here’s a bit of folklore I often hear (and retell) that’s somewhere between a joke and deep wisdom: if you’re doing a software interview that involves some algorithms problem that seems hard, your best bet is to use hash tables.

More succinctly put: Google loves hash tables.

As someone with a passion for math and theoretical CS, it’s kind of silly and reductionist. But if you actually work with terabytes of data that can’t fit on a single machine, it also makes sense.

But to understand why hash tables are so applicable, you should have at least a fuzzy understanding of the math that goes into it, which is surprisingly unrelated to the actual act of hashing. Instead it’s the guarantees that a “random enough” hash provides that makes it so useful. The basic intuition is that if you have an algorithm that works well assuming the input data is completely random, then you can probably get a good guarantee by preprocessing the input by hashing.

In this post I’ll explain the details, and show the application to an important problem that one often faces in dealing with huge amounts of data: how to allocate resources efficiently (load balancing). As usual, all of the code used in the making of this post is available on Github.

Next week, I’ll follow this post up with another application of hashing to estimating the number of distinct items in a set that’s too large to store in memory.

Families of Hash Functions

To emphasize which specific properties of hash functions are important for a given application, we start by introducing an abstraction: a hash function is just some computable function that accepts strings as input and produces numbers between 1 and $ n$ as output. We call the set of allowed inputs $ U$ (for “Universe”). A family of hash functions is just a set of possible hash functions to choose from. We’ll use a scripty $ \mathscr{H}$ for our family, and so every hash function $ h$ in $ \mathscr{H}$ is a function $ h : U \to \{ 1, \dots, n \}$.

You can use a single hash function $ h$ to maintain an unordered set of objects in a computer. The reason this is a problem that needs solving is because if you were to store items sequentially in a list, and if you want to determine if a specific item is already in the list, you need to potentially check every item in the list (or do something fancier). In any event, without hashing you have to spend some non-negligible amount of time searching. With hashing, you can choose the location of an element $ x \in U$ based on the value of its hash $ h(x)$. If you pick your hash function well, then you’ll have very few collisions and can deal with them efficiently. The relevant section on Wikipedia has more about the various techniques to deal with collisions in hash tables specifically, but we want to move beyond that in this post.

Here we have a family of random hash functions. So what’s the use of having many hash functions? You can pick a hash randomly from a “good” family of hash functions. While this doesn’t seem so magical, it has the informal property that it makes arbitrary data “random enough,” so that an algorithm which you designed to work with truly random data will also work with the hashes of arbitrary data. Moreover, even if an adversary knows $ \mathscr{H}$ and knows that you’re picking a hash function at random, there’s no way for the adversary to manufacture problems by feeding bad data. With overwhelming probability the worst-case scenario will not occur. Our first example of this is in load-balancing.

Load balancing and 2-uniformity

You can imagine load balancing in two ways, concretely and mathematically. In the concrete version you have a public-facing server that accepts requests from users, and forwards them to a back-end server which processes them and sends a response to the user. When you have a billion users and a million servers, you want to forward the requests in such a way that no server gets too many requests, or else the users will experience delays. Moreover, you’re worried that the League of Tanzanian Hackers is trying to take down your website by sending you requests in a carefully chosen order so as to screw up your load balancing algorithm.

The mathematical version of this problem usually goes with the metaphor of balls and bins. You have some collection of $ m$ balls and $ n$ bins in which to put the balls, and you want to put the balls into the bins. But there’s a twist: an adversary is throwing balls at you, and you have to put them into the bins before the next ball comes, so you don’t have time to remember (or count) how many balls are in each bin already. You only have time to do a small bit of mental arithmetic, sending ball $ i$ to bin $ f(i)$ where $ f$ is some simple function. Moreover, whatever rule you pick for distributing the balls in the bins, the adversary knows it and will throw balls at you in the worst order possible.

silk-balls.jpg
A young man applying his knowledge of balls and bins. That’s totally what he’s doing.

There is one obvious approach: why not just pick a uniformly random bin for each ball? The problem here is that we need the choice to be persistent. That is, if the adversary throws the same ball at us a second time, we need to put it in the same bin as the first time, and it doesn’t count toward the overall load. This is where the ball/bin metaphor breaks down. In the request/server picture, there is data specific to each user stored on the back-end server between requests (a session), and you need to make sure that data is not lost for some reasonable period of time. And if we were to save a uniform random choice after each request, we’d need to store a number for every request, which is too much. In short, we need the mapping to be persistent, but we also want it to be “like random” in effect.

So what do you do? The idea is to take a “good” family of hash functions $ \mathscr{H}$, pick one $ h \in \mathscr{H}$ uniformly at random for the whole game, and when you get a request/ball $ x \in U$ send it to server/bin $ h(x)$. Note that in this case, the adversary knows your universal family $ \mathscr{H}$ ahead of time, and it knows your algorithm of committing to some single randomly chosen $ h \in \mathscr{H}$, but the adversary does not know which particular $ h$ you chose.

The property of a family of hash functions that makes this strategy work is called 2-universality.

Definition: A family of functions $ \mathscr{H}$ from some universe $ U \to \{ 1, \dots, n \}$. is called 2-universal if, for every two distinct $ x, y \in U$, the probability over the random choice of a hash function $ h$ from $ \mathscr{H}$ that $ h(x) = h(y)$ is at most $ 1/n$. In notation,

$ \displaystyle \Pr_{h \in \mathscr{H}}[h(x) = h(y)] \leq \frac{1}{n}$

I’ll give an example of such a family shortly, but let’s apply this to our load balancing problem. Our load-balancing algorithm would fail if, with even some modest probability, there is some server that receives many more than its fair share ($ m/n$) of the $ m$ requests. If $ \mathscr{H}$ is 2-universal, then we can compute an upper bound on the expected load of a given server, say server 1. Specifically, pick any element $ x$ which hashes to 1 under our randomly chosen $ h$. Then we can compute an upper bound on the expected number of other elements that hash to 1. In this computation we’ll only use the fact that expectation splits over sums, and the definition of 2-universal. Call $ \mathbf{1}_{h(y) = 1}$ the random variable which is zero when $ h(y) \neq 1$ and one when $ h(y) = 1$, and call $ X = \sum_{y \in U} \mathbf{1}_{h(y) = 1}$. In words, $ X$ simply represents the number of inputs that hash to 1. Then

exp-calc

So in expectation we can expect server 1 gets its fair share of requests. And clearly this doesn’t depend on the output hash being 1; it works for any server. There are two obvious questions.

  1. How do we measure the risk that, despite the expectation we computed above, some server is overloaded?
  2. If it seems like (1) is on track to happen, what can you do?

For 1 we’re asking to compute, for a given deviation $ t$, the probability that $ X – \mathbb{E}[X] > t$. This makes more sense if we jump to multiplicative factors, since it’s usually okay for a server to bear twice or three times its usual load, but not like $ \sqrt{n}$ times more than it’s usual load. (Industry experts, please correct me if I’m wrong! I’m far from an expert on the practical details of load balancing.)

So we want to know what is the probability that $ X – \mathbb{E}[X] > t \cdot \mathbb{E}[X]$ for some small number $ t$, and we want this to get small quickly as $ t$ grows. This is where the Chebyshev inequality becomes useful. For those who don’t want to click the link, for our sitauation Chebyshev’s inequality is the statement that, for any random variable $ X$

$ \displaystyle \Pr[|X – \mathbb{E}[X]| > t\mathbb{E}[X]] \leq \frac{\textup{Var}[X]}{t^2 \mathbb{E}^2[X]}.$

So all we need to do is compute the variance of the load of a server. It’s a bit of a hairy calculation to write down, but rest assured it doesn’t use anything fancier than the linearity of expectation and 2-universality. Let’s dive in. We start by writing the definition of variance as an expectation, and then we split $ X$ up into its parts, expand the product and group the parts.

$ \displaystyle \textup{Var}[X] = \mathbb{E}[(X – \mathbb{E}[X])^2] = \mathbb{E}[X^2] – (\mathbb{E}[X])^2$

The easy part is $ (\mathbb{E}[X])^2$, it’s just $ (1 + (m-1)/n)^2$, and the hard part is $ \mathbb{E}[X^2]$. So let’s compute that

esquared-calcluation

In order to continue (and get a reasonable bound) we need an additional property of our hash family which is not immediately spelled out by 2-universality. Specifically, we need that for every $ h$ and $ i$, $ \Pr_x[h(x) = i] = O(\frac{1}{n})$. In other words, each hash function should evenly split the inputs across servers.

The reason this helps is because we can split $ \Pr[h(x) = h(y) = 1]$  into $ \Pr[h(x) = h(y) \mid h(x) = 1] \cdot \Pr[h(x) = 1]$. Using 2-universality to bound the left term, this quantity is at most $ 1/n^2$, and since there are $ \binom{m}{2}$ total terms in the double sum above, the whole thing is at most $ O(m/n + m^2 / n^2) = O(m^2 / n^2)$. Note that in our big-O analysis we’re assuming $ m$ is much bigger than $ n$.

Sweeping some of the details inside the big-O, this means that our variance is $ O(m^2/n^2)$, and so our bound on the deviation of $ X$ from its expectation by a multiplicative factor of $ t$ is at most $ O(1/t^2)$.

Now we computed a bound on the probability that a single server is not overloaded, but if we want to extend that to the worst-case server, the typical probability technique is to take the union bound over all servers. This means we just add up all the individual bounds and ignore how they relate. So the probability that some server has a load more than a multiplicative factor of $ t$ is bounded from above $ O(n/t^2)$. This is only less than one when $ t = \Omega(\sqrt{n})$, so all we can say with this analysis is that (with some small constant probability) no server will have a load worse than $ \sqrt{n}$ times more than the expected load.

So we have this analysis that seems not so good. If we have a million servers then the worst load on one server could potentially be a thousand times higher than the expected load. This doesn’t scale, and the problem could be in any (or all) of three places:

  1. Our analysis is weak, and we should use tighter bounds because the true max load is actually much smaller.
  2. Our hash families don’t have strong enough properties, and we should beef those up to get tighter bounds.
  3. The whole algorithm sucks and needs to be improved.

It turns out all three are true. One heuristic solution is easy and avoids all math. Have some second server (which does not process requests) count hash collisions. When some server exceeds a factor of $ t$ more than the expected load, send a message to the load balancer to randomly pick a new hash function from $ \mathscr{H}$ and for any requests that don’t have existing sessions (this is included in the request data), use the new hash function. Once the old sessions expire, switch any new incoming requests from those IPs over to the new hash function.

But there are much better solutions out there. Unfortunately their analyses are too long for a blog post (they fill multiple research papers). Fortunately their descriptions and guarantees are easy to describe, and they’re easy to program. The basic idea goes by the name “the power of two choices,” which we explored on this blog in a completely different context of random graphs.

In more detail, the idea is that you start by picking two random hash functions $ h_1, h_2 \in \mathscr{H}$, and when you get a new request, you compute both hashes, inspect the load of the two servers indexed by those hashes, and send the request to the server with the smaller load.

This has the disadvantage of requiring bidirectional talk between the load balancer and the server, rather than obliviously forwarding requests. But the advantage is an exponential decrease in the worst-case maximum load. In particular, the following theorem holds for the case where the hashes are fully random.

Theorem: Suppose one places $ m$ balls into $ n$ bins in order according to the following procedure: for each ball pick two uniformly random and independent integers $ 1 \leq i,j \leq n$, and place the ball into the bin with the smallest current size. If there are ties pick the bin with the smaller index. Then with high probability the largest bin has no more than $ \Theta(m/n) + O(\log \log (n))$ balls.

This theorem appears to have been proved in a few different forms, with the best analysis being by Berenbrink et al. You can improve the constant on the $ \log \log n$ by computing more than 2 hashes. How does this relate to a good family of hash functions, which is not quite fully random? Let’s explore the answer by implementing the algorithm in python.

An example of universal hash functions, and the load balancing algorithm

In order to implement the load balancer, we need to have some good hash functions under our belt. We’ll go with the simplest example of a hash function that’s easy to prove nice properties for. Specifically each hash in our family just performs some arithmetic modulo a random prime.

Definition: Pick any prime $ p > m$, and for any $ 1 \leq a < p$ and $ 0 \leq b \leq n$ define $ h_{a,b}(x) = (ax + b \mod p) \mod m$. Let $ \mathscr{H} = \{ h_{a,b} \mid 0 \leq b < p, 1 \leq a < p \}$.

This family of hash functions is 2-universal.

Theorem: For every $ x \neq y \in \{0, \dots, p\}$,

$ \Pr_{h \in \mathscr{H}}[h(x) = h(y)] \leq 1/p$

Proof. To say that $ h(x) = h(y)$ is to say that $ ax+b = ay+b + i \cdot m \mod p$ for some integer $ i$. I.e., the two remainders of $ ax+b$ and $ ay+b$ are equivalent mod $ m$. The $ b$’s cancel and we can solve for $ a$

$ a = im (x-y)^{-1} \mod p$

Since $ a \neq 0$, there are $ p-1$ possible choices for $ a$. Moreover, there is no point to pick $ i$ bigger than $ p/m$ since we’re working modulo $ p$. So there are $ (p-1)/m$ possible values for the right hand side of the above equation. So if we chose them uniformly at random, (remember, $ x-y$ is fixed ahead of time, so the only choice is $ a, i$), then there is a $ (p-1)/m$ out of $ p-1$ chance that the equality holds, which is at most $ 1/m$. (To be exact you should account for taking a floor of $ (p-1)/m$ when $ m$ does not evenly divide $ p-1$, but it only decreases the overall probability.)

$ \square$

If $ m$ and $ p$ were equal then this would be even more trivial: it’s just the fact that there is a unique line passing through any two distinct points. While that’s obviously true from standard geometry, it is also true when you work with arithmetic modulo a prime. In fact, it works using arithmetic over any field.

Implementing these hash functions is easier than shooting fish in a barrel.

import random

def draw(p, m):
   a = random.randint(1, p-1)
   b = random.randint(0, p-1)

   return lambda x: ((a*x + b) % p) % m

To encapsulate the process a little bit we implemented a UniversalHashFamily class which computes a random probable prime to use as the modulus and stores $ m$. The interested reader can see the Github repository for more.

If we try to run this and feed in a large range of inputs, we can see how the outputs are distributed. In this example $ m$ is a hundred thousand and $ n$ is a hundred (it’s not two terabytes, but give me some slack it’s a demo and I’ve only got my desktop!). So the expected bin size for any 2-universal family is just about 1,000.

>>> m = 100000
>>> n = 100
>>> H = UniversalHashFamily(numBins=n, primeBounds=[n, 2*n])
>>> results = []
>>> for simulation in range(100):
...    bins = [0] * n
...    h = H.draw()
...    for i in range(m):
...       bins[h(i)] += 1
...    results.append(max(bins))
...
>>> max(bins) # a single run
1228
>>> min(bins)
613
>>> max(results) # the max bin size over all runs
1228
>>> min(results)
1227

Indeed, the max is very close to the expected value.

But this example is misleading, because the point of this was that some adversary would try to screw us over by picking a worst-case input. If the adversary knew exactly which $ h$ was chosen (which it doesn’t) then the worst case input would be the set of all inputs that have the given hash output value. Let’s see it happen live.

>>> h = H.draw()
>>> badInputs = [i for i in range(m) if h(i) == 9]
>>>; len(badInputs)
1227
>>> testInputs(n,m,badInputs,hashFunction=h)
[0, 0, 0, 0, 0, 0, 0, 0, 0, 1227, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

The expected size of a bin is 12, but as expected this is 100 times worse (linearly worse in $ n$). But if we instead pick a random $ h$ after the bad inputs are chosen, the result is much better.

>>> testInputs(n,m,badInputs) # randomly picks a hash
[19, 20, 20, 19, 18, 18, 17, 16, 16, 16, 16, 17, 18, 18, 19, 20, 20, 19, 18, 17, 17, 16, 16, 16, 16, 17, 18, 18, 19, 20, 20, 19, 18, 17, 17, 16, 16, 16, 16, 8, 8, 9, 9, 10, 10, 10, 10, 9, 9, 8, 8, 8, 8, 8, 8, 9, 9, 10, 10, 10, 10, 9, 9, 8, 8, 8, 8, 8, 8, 9, 9, 10, 10, 10, 10, 9, 8, 8, 8, 8, 8, 8, 8, 9, 9, 10, 10, 10, 10, 9, 8, 8, 8, 8, 8, 8, 8, 9, 9, 10]

However, if you re-ran this test many times, you’d eventually get unlucky and draw the hash function for which this actually is the worst input, and get a single huge bin. Other times you can get a bad hash in which two or three bins have all the inputs.

An interesting question is, what is really the worst-case input for this algorithm? I suspect it’s characterized by some choice of hash output values, taking all inputs for the chosen outputs. If this is the case, then there’s a tradeoff between the number of inputs you pick and how egregious the worst bin is. As an exercise to the reader, empirically estimate this tradeoff and find the best worst-case input for the adversary. Also, for your choice of parameters, estimate by simulation the probability that the max bin is three times larger than the expected value.

Now that we’ve played around with the basic hashing algorithm and made a family of 2-universal hashes, let’s see the power of two choices. Recall, this algorithm picks two random hash functions and sends an input to the bin with the smallest size. This obviously generalizes to $ k$ choices, although the theoretical guarantee only improves by a constant factor, so let’s implement the more generic version.

class ChoiceHashFamily(object):
   def __init__(self, hashFamily, queryBinSize, numChoices=2):
      self.queryBinSize = queryBinSize
      self.hashFamily = hashFamily
      self.numChoices = numChoices

   def draw(self):
      hashes = [self.hashFamily.draw()
                   for _ in range(self.numChoices)]

      def h(x):
         indices = [h(x) for h in hashes]
         counts = [self.queryBinSize(i) for i in indices]
         count, index = min([(c,i) for (c,i) in zip(counts,indices)])
         return index

      return h

And if we test this with the bad inputs (as used previously, all the inputs that hash to 9), as a typical output we get

>>> bins
[15, 16, 15, 15, 16, 14, 16, 14, 16, 15, 16, 15, 15, 15, 17, 14, 16, 14, 16, 16, 15, 16, 15, 16, 15, 15, 17, 15, 16, 15, 15, 15, 15, 16, 15, 14, 16, 14, 16, 15, 15, 15, 14, 16, 15, 15, 15, 14, 17, 14, 15, 15, 14, 16, 13, 15, 14, 15, 15, 15, 14, 15, 13, 16, 14, 16, 15, 15, 15, 16, 15, 15, 13, 16, 14, 15, 15, 16, 14, 15, 15, 15, 11, 13, 11, 12, 13, 14, 13, 11, 11, 12, 14, 14, 13, 10, 16, 12, 14, 10]

And a typical list of bin maxima is

>>> results
[16, 16, 16, 18, 17, 365, 18, 16, 16, 365, 18, 17, 17, 17, 17, 16, 16, 17, 18, 16, 17, 18, 17, 16, 17, 17, 18, 16, 18, 17, 17, 17, 17, 18, 18, 17, 17, 16, 17, 365, 17, 18, 16, 16, 18, 17, 16, 18, 365, 16, 17, 17, 16, 16, 18, 17, 17, 17, 17, 17, 18, 16, 18, 16, 16, 18, 17, 17, 365, 16, 17, 17, 17, 17, 16, 17, 16, 17, 16, 16, 17, 17, 16, 365, 18, 16, 17, 17, 17, 17, 17, 18, 17, 17, 16, 18, 18, 17, 17, 17]

Those big bumps are the times when we picked an unlucky hash function, which is scarily large, although this bad event would be proportionally less likely as you scale up. But in the good case the load is clearly more even than the previous example, and the max load would get linearly smaller as you pick between a larger set of randomly chosen hashes (obviously).

Coupling this with the technique of switching hash functions when you start to observe a large deviation, and you have yourself an elegant solution.

In addition to load balancing, hashing has a ton of applications. Remember, the main key that you may want to use hashing is when you have an algorithm that works well when the input data is random. This comes up in streaming and sublinear algorithms, in data structure design and analysis, and many other places. We’ll be covering those applications in future posts on this blog.

Until then!