Optimizing the day 6 Advent of Code 2022 challenge a little too much

WARNING: Spoilers ahead for the thing mentioned in the title.

Today’s (2022-12-06) AOC (Advent of Code) was easier than yesterday’s if you have been doing it in rust.

You basically just have to implement code that finds the ending index of the first sequence of N bytes in a given input where no two bytes are the same.

It is relatively trivial to do in rust. Here is my initial implementation.

fn all_distinct<T: Clone + Ord>(slice: &[T]) -> bool {
    let mut slice_copy = slice.to_vec();
    slice_copy.sort_unstable();
    slice_copy.windows(2).all(|pair| pair[0] != pair[1])
}
fn first_unique<T: Clone + Ord>(
    datastream: &[T],
    required_distinct: usize
) -> Option<usize> {
    datastream
        .windows(required_distinct)
        .enumerate()
        .find_map(|(i, slice)| all_distinct(slice).then_some(i + required_distinct))
}
fn main() {
    let input = std::io::stdin().lines().next().unwrap().unwrap().into_bytes();
    let output = first_unique(&input, 4).unwrap();
    println!("{output}");
}

ez-pz

Could have ended here, but as your scrollbar may hint, that wasn’t accounting for @Toby on the bevy discord implementing a benchmarking suite to compare the performance of each solutions people posted in the #off-topic channel.

From now on I’ll refer to required_distinct by W. I’ll also refer to the input length by N.

solution nameruntime (ns)W
hashet244,7604
manevillef242,4034
nicopap123,6004
snaketwix50,0004
vec153,1074
–––––––
hashet1,792,00014
manevillef1,239,00014
nicopap699,00014
snaketwix106,00014
vec1,764,67714

I’m fairing pretty well for an a priori very naive implementation optimized for readability. But snaketwix clearly has the lead. The reason is that, unlike all the other Solutions, they only iterates over the input once. They have a VecDeque to keep track of already seen characters and dropping them when they go out of the window. So their solution has a complexity of O(N).

My solution uses windows(required_distinct) and does an operation per element of that window (both a sort_unstable and a linear adjacent element comparison). It also allocates a whole Vec per stream element, going to a complexity of O(N * (W + W * log(W))) = O(N * W * log(W)). This is clearly visible when W is increased in the benchmarks.

First optimization

Suddenly invaded by a Zachtronician sense of optimization, I decided to go down the rabbit hole.

Allocation is a well known source of slowdown. Especially if recurrent. Each new allocation can put the allocated data at widely different position in memory. When reading that data, it might evict the CPU cache and result in a few thousands cycles of slowdown.

I decided to “hoist” (aka moving outside of a loop) the Vec allocated in all_distinct. Since I call all_distinct in a loop, I must now accept the Vec as a function parameter, I’ll just initialize the Vec in first_unique.

// Now we have a &mut Vec<T> as parameter    vvvvvvvvvvvvvvvvvvvvvvvv
fn all_distinct<T: Clone + Ord>(slice: &[T], collect_vec: &mut Vec<T>) -> bool {
    // `Vec::clear` removes all elements while keeping the pre-existing capacity.
    // Perfect, since `slice` will always have the same length.
    collect_vec.clear();
    slice.clone_into(collect_vec);
    collect_vec.sort_unstable();
    collect_vec.windows(2).all(|pair| pair[0] != pair[1])
}
fn first_unique<T: Clone + Ord>(
    datastream: &[T],
    required_distinct: usize
) -> Option<usize> {
    // The Vec is initialized here, outside of the find_map, before we iterate.
    let mut collect_vec = Vec::with_capacity(required_distinct);
    datastream
        .windows(required_distinct)
        .enumerate()
        .find_map(|(i, slice)| {
            // must pass it as argument vvvvvvvv
            all_distinct(slice, &mut collect_vec).then_some(i + required_distinct)
        })
}
fn main() {
    let input = std::io::stdin().lines().next().unwrap().unwrap().into_bytes();
    let output = first_unique(&input, 4).unwrap();
    println!("{output}");
}
solution nameruntime (ns)W
hashet344,4204
manevillef338,3754
nicopap166,6054
nicopap_hoist6,4104
snaketwix59,6854
vec212,3304
–––––––
hashet2,126,02514
manevillef1,654,48514
nicopap931,94614
nicopap_hoist618,20214
snaketwix130,35814
vec2,406,82514

(yep, that’s a 26× speedup on W = 4)

Notice that nicopap_hoist is two times faster than the snaketwix solution on W = 4. This may be because their solution rely heavily on HashMap and inserting/popping from a VecDeque. For small W, the overhead of the HashMap is greater than the additional operations of an O(N * W * log(W)) impl with no overhead otherwise.

Now the question of why we got a 27 times speedup for W = 4, while only having a 1.5 speedup for W = 14. I actually have no idea. It might be that allocating a Vec is as slow as W = 7.

Removing such overhead would indeed result in about 6 times speedup on very small W and <2 speedups on larger W. There might be other optimizations kicking in, but I wouldn’t be able to name them.

Second Optimization

snaketwix’s solution has a O(N) complexity. Could we reach such complexity level without what seems like overhead introduced by HashMap? Answer is: nay. But yet we can do better.

We have one major issue with our implementation: Sorting each windows. This represents a significant computing cost, repeated N times. Especially since we have to copy the whole slice to our Vec (even if pre-allocated) each time in addition to sorting it.

Why are we sorting? Simply put: we want to detect duplicates. There is of course better ways to detect duplicates.

A set

Most notably, we could – instead of copying each element to a new Vec, sorting them, and then comparing them pair-wise – just, you know, add them one after the others to a sort of set, and check each time if it is already in the set. This already reduces the time complexity of our stuff from O(N * W * log(W)) to O(N * W)!

But as you might have noticed, it’s a pretty bad idea. All those solutions that are about twice slower than my implementation do exactly this.

Why are they slower if they have a lower complexity? Simply put, O (big O notation, what I have been referring to as “complexity”) is a limit, this means that for small values of N and W, complexity is not a good indicator of runtime. Most of the time other constant overhead are so great that the complexity start to be useful to have an idea of runtime only after the variables in the expression are greater than a hundred thousands. For some algorithms constant overhead can be so great that the big O notation is only good as a purely theoretical construct.

In this particular case, the “better” solutions are slower because they use HashSet (or HashMap). Hashing is indeed a constant time operation, but it’s fairly costly. On top of that, they add a level of indirection that might kill somewhat cache coherency again. This makes using the rust standard library impractical.

So what alternative do we have? Well…

When doing some improvements to the bevy game engine, I came across bitsets. It’s a cool data structure where an individual element can be represented as a single bit. We can then check the bit for set ownership.

Our input is explicitly only single letters from the alphabet. So to encode a set of input elements, we only need a total of 26 bits! Hell, this means we could simply encode a bitset as a u32! (Has 32 > 26 bits)

struct AlphabetSet(u32);
impl AlphabetSet {
    const fn new() -> Self {
        Self(0)
    }
    /// Add letter to set, return `true` if it is already part of it.
    /// `symbol` must be an ascii latin alphabet letter.
    fn add_letter_or_contains(&mut self, symbol: u8) -> bool {
        let to_set = 1 << (symbol.wrapping_sub(b'a') as u32);
        let already_set = self.0 & to_set == to_set;
        self.0 |= to_set;
        already_set
    }
}

It’s not rocket science, and it’s very efficient. I’ve even used bitsets for my GBA game.

In that snippet, I subtract from symbol (the alphabet character from the input) the u8 value of a (so that the value of alphabet letters are in [0..26]), and then I “add it to the set” by setting the bit in the “set” (our u32) that corresponds to our letter to 1 (self.0 |= 1 << [alphabet_index]).

fn all_distinct(slice: &[u8]) -> bool {
    let mut set = AlphabetSet::new();
    !slice
        .iter()
        .any(|letter| set.add_letter_or_contains(*letter))
}
// sop stands for "start of packet"
fn first_sop<const W: usize>(datastream: &[u8]) -> Option<usize> {
    datastream
        .array_windows::<W>()
        .enumerate()
        .find_map(|(i, slice)| all_distinct(slice).then_some(i + W))
}
fn main() {
    let input = stdin().lines().next().unwrap().unwrap().into_bytes();
    let output = first_sop::<14>(&input).unwrap();
    println!("{output}");
}

(I also swapped to use array_windows over windows, for exoticism, since windows will definitively generate the same final assembly)

Then we just need to replace the all_distinct function to use the AlphabetSet. Specifically, we return from add_letter_or_contains a bool telling if the letter was already encountered.

That’s it. It’s still O(N * W), but W here grow by about 5 CPU cycle per W, so unlike the hoist solution (which still was relatively light on CPU cycle growth per W, I’d say 100 or so) there shouldn’t be a drastic difference between relatively close values of W. To rephrase: This means the runtime of the solution should grow still linearly but much slower than with hoist. Maybe so little that we’ll beat more theoretically efficient solutions (since we are talking about close to 0 overhead here).

Now let’s see our benchmarks…

solution nameruntime (ns)W
hashet344,3604
manevillef_2169,7674
nicopap158,9364
nicopap_hoist6,5274
nicopap_bitset5,1974
snaketwix58,1784
vec113,4734
–––––––
hashet2,144,09214
manevillef_21,085,11214
nicopap919,23214
nicopap_hoist623,37014
nicopap_bitset24,87114
snaketwix118,89914
vec2,141,10014

Outrageously fast!

Going further

I didn’t explore faster possibilities, but @Verte proposed an interesting solution, very similar to mine. Instead of using !windows.any([add_or_contains]), they simply Add all the letters from the window to the bitset and count how many bits are set (this can be done in a single instruction with u32::count_ones) If the number of bits is equal to the number of elements, it means all the elements are unique!

This is pretty good, and possibly more efficient than my solution. In my case, I early exit as soon as a duplicate is found. This translates in the generated assembly by a single jump instruction after each comparison. Due to branch prediction and pipeline invalidation, Verte’s solution might really be better.

The CPU has an instruction pipeline, it preloads the next few instructions it will execute. Problem is that when encountering a conditional jump instruction, the next few instructions become unknown since it’s impossible to know the value of the predicate before evaluating it. So if it didn’t load the correct instructions, it will need to clear the pipeline and reload the next instructions, causing a stall of a few cycles to a few hundred cycles depending on the context.

Just unconditionally adding up all the letters to the bit set, then acting on it fixes that problem, and enables the potential for vectorization (aka automatic SIMD instructions)

I didn’t test it yet, but I could expect a 4× speedup.