Optimizing the day 6 Advent of Code 2022 challenge a little too much
WARNING: Spoilers ahead for the thing mentioned in the title.
Today’s (2022-12-06) AOC (Advent of Code) was easier than yesterday’s if you have been doing it in rust.
You basically just have to implement code that finds the ending index of the first sequence of N bytes in a given input where no two bytes are the same.
It is relatively trivial to do in rust. Here is my initial implementation.
fn all_distinct<T: Clone + Ord>(slice: &[T]) -> bool {
let mut slice_copy = slice.to_vec();
slice_copy.sort_unstable();
slice_copy.windows(2).all(|pair| pair[0] != pair[1])
}
fn first_unique<T: Clone + Ord>(
datastream: &[T],
required_distinct: usize
) -> Option<usize> {
datastream
.windows(required_distinct)
.enumerate()
.find_map(|(i, slice)| all_distinct(slice).then_some(i + required_distinct))
}
fn main() {
let input = std::io::stdin().lines().next().unwrap().unwrap().into_bytes();
let output = first_unique(&input, 4).unwrap();
println!("{output}");
}
ez-pz
Could have ended here, but as your scrollbar may hint, that wasn’t accounting for @Toby
on the bevy discord implementing a benchmarking suite to compare the performance of each solutions people posted in the #off-topic
channel.
From now on I’ll refer to required_distinct
by W
. I’ll also refer to the input length by N
.
solution name | runtime (ns) | W |
---|---|---|
hashet | 244,760 | 4 |
manevillef | 242,403 | 4 |
nicopap | 123,600 | 4 |
snaketwix | 50,000 | 4 |
vec | 153,107 | 4 |
––––––– | ||
hashet | 1,792,000 | 14 |
manevillef | 1,239,000 | 14 |
nicopap | 699,000 | 14 |
snaketwix | 106,000 | 14 |
vec | 1,764,677 | 14 |
I’m fairing pretty well for an a priori very naive implementation optimized for readability. But snaketwix
clearly has the lead. The reason is that, unlike all the other Solutions, they only iterates over the input once. They have a VecDeque
to keep track of already seen characters and dropping them when they go out of the window. So their solution has a complexity of O(N)
.
My solution uses windows(required_distinct)
and does an operation per element of that window (both a sort_unstable
and a linear adjacent element comparison). It also allocates a whole Vec
per stream element, going to a complexity of O(N * (W + W * log(W))) = O(N * W * log(W))
. This is clearly visible when W
is increased in the benchmarks.
First optimization
Suddenly invaded by a Zachtronician sense of optimization, I decided to go down the rabbit hole.
Allocation is a well known source of slowdown. Especially if recurrent. Each new allocation can put the allocated data at widely different position in memory. When reading that data, it might evict the CPU cache and result in a few thousands cycles of slowdown.
I decided to “hoist” (aka moving outside of a loop) the Vec
allocated in all_distinct
. Since I call all_distinct
in a loop, I must now accept the Vec
as a function parameter, I’ll just initialize the Vec
in first_unique
.
// Now we have a &mut Vec<T> as parameter vvvvvvvvvvvvvvvvvvvvvvvv
fn all_distinct<T: Clone + Ord>(slice: &[T], collect_vec: &mut Vec<T>) -> bool {
// `Vec::clear` removes all elements while keeping the pre-existing capacity.
// Perfect, since `slice` will always have the same length.
collect_vec.clear();
slice.clone_into(collect_vec);
collect_vec.sort_unstable();
collect_vec.windows(2).all(|pair| pair[0] != pair[1])
}
fn first_unique<T: Clone + Ord>(
datastream: &[T],
required_distinct: usize
) -> Option<usize> {
// The Vec is initialized here, outside of the find_map, before we iterate.
let mut collect_vec = Vec::with_capacity(required_distinct);
datastream
.windows(required_distinct)
.enumerate()
.find_map(|(i, slice)| {
// must pass it as argument vvvvvvvv
all_distinct(slice, &mut collect_vec).then_some(i + required_distinct)
})
}
fn main() {
let input = std::io::stdin().lines().next().unwrap().unwrap().into_bytes();
let output = first_unique(&input, 4).unwrap();
println!("{output}");
}
solution name | runtime (ns) | W |
---|---|---|
hashet | 344,420 | 4 |
manevillef | 338,375 | 4 |
nicopap | 166,605 | 4 |
nicopap_hoist | 6,410 | 4 |
snaketwix | 59,685 | 4 |
vec | 212,330 | 4 |
––––––– | ||
hashet | 2,126,025 | 14 |
manevillef | 1,654,485 | 14 |
nicopap | 931,946 | 14 |
nicopap_hoist | 618,202 | 14 |
snaketwix | 130,358 | 14 |
vec | 2,406,825 | 14 |
(yep, that’s a 26× speedup on W
= 4)
Notice that nicopap_hoist
is two times faster than the snaketwix
solution on W
= 4. This may be because their solution rely heavily on HashMap
and inserting/popping from a VecDeque
. For small W
, the overhead of the HashMap
is greater than the additional operations of an O(N * W * log(W))
impl with no overhead otherwise.
Now the question of why we got a 27 times speedup for W
= 4, while only having a 1.5 speedup for W
= 14. I actually have no idea. It might be that allocating a Vec
is as slow as W
= 7.
Removing such overhead would indeed result in about 6 times speedup on very small W
and <2
speedups on larger W
. There might be other optimizations kicking in, but I wouldn’t be able to name them.
Second Optimization
snaketwix
’s solution has a O(N)
complexity. Could we reach such complexity level without what seems like overhead introduced by HashMap
? Answer is: nay. But yet we can do better.
We have one major issue with our implementation: Sorting each windows. This represents a significant computing cost, repeated N
times. Especially since we have to copy the whole slice to our Vec
(even if pre-allocated) each time in addition to sorting it.
Why are we sorting? Simply put: we want to detect duplicates. There is of course better ways to detect duplicates.
A set
Most notably, we could – instead of copying each element to a new Vec
, sorting them, and then comparing them pair-wise – just, you know, add them one after the others to a sort of set, and check each time if it is already in the set. This already reduces the time complexity of our stuff from O(N * W * log(W))
to O(N * W)
!
But as you might have noticed, it’s a pretty bad idea. All those solutions that are about twice slower than my implementation do exactly this.
Why are they slower if they have a lower complexity? Simply put, O
(big O notation, what I have been referring to as “complexity”) is a limit, this means that for small values of N
and W
, complexity is not a good indicator of runtime. Most of the time other constant overhead are so great that the complexity start to be useful to have an idea of runtime only after the variables in the expression are greater than a hundred thousands. For some algorithms constant overhead can be so great that the big O notation is only good as a purely theoretical construct.
In this particular case, the “better” solutions are slower because they use HashSet
(or HashMap
). Hashing is indeed a constant time operation, but it’s fairly costly. On top of that, they add a level of indirection that might kill somewhat cache coherency again. This makes using the rust standard library impractical.
So what alternative do we have? Well…
When doing some improvements to the bevy game engine, I came across bitsets. It’s a cool data structure where an individual element can be represented as a single bit. We can then check the bit for set ownership.
Our input is explicitly only single letters from the alphabet. So to encode a set of input elements, we only need a total of 26 bits! Hell, this means we could simply encode a bitset as a u32
! (Has 32 > 26 bits)
struct AlphabetSet(u32);
impl AlphabetSet {
const fn new() -> Self {
Self(0)
}
/// Add letter to set, return `true` if it is already part of it.
/// `symbol` must be an ascii latin alphabet letter.
fn add_letter_or_contains(&mut self, symbol: u8) -> bool {
let to_set = 1 << (symbol.wrapping_sub(b'a') as u32);
let already_set = self.0 & to_set == to_set;
self.0 |= to_set;
already_set
}
}
It’s not rocket science, and it’s very efficient. I’ve even used bitsets for my GBA game.
In that snippet, I subtract from symbol
(the alphabet character from the input) the u8
value of a
(so that the value of alphabet letters are in [0..26]), and then I “add it to the set” by setting the bit in the “set” (our u32
) that corresponds to our letter to 1
(self.0 |= 1 << [alphabet_index]
).
fn all_distinct(slice: &[u8]) -> bool {
let mut set = AlphabetSet::new();
!slice
.iter()
.any(|letter| set.add_letter_or_contains(*letter))
}
// sop stands for "start of packet"
fn first_sop<const W: usize>(datastream: &[u8]) -> Option<usize> {
datastream
.array_windows::<W>()
.enumerate()
.find_map(|(i, slice)| all_distinct(slice).then_some(i + W))
}
fn main() {
let input = stdin().lines().next().unwrap().unwrap().into_bytes();
let output = first_sop::<14>(&input).unwrap();
println!("{output}");
}
(I also swapped to use array_windows
over windows
, for exoticism, since windows
will definitively generate the same final assembly)
Then we just need to replace the all_distinct
function to use the AlphabetSet
. Specifically, we return from add_letter_or_contains
a bool telling if the letter was already encountered.
That’s it. It’s still O(N * W)
, but W
here grow by about 5 CPU cycle per W
, so unlike the hoist solution (which still was relatively light on CPU cycle growth per W
, I’d say 100 or so) there shouldn’t be a drastic difference between relatively close values of W
. To rephrase: This means the runtime of the solution should grow still linearly but much slower than with hoist. Maybe so little that we’ll beat more theoretically efficient solutions (since we are talking about close to 0 overhead here).
Now let’s see our benchmarks…
solution name | runtime (ns) | W |
---|---|---|
hashet | 344,360 | 4 |
manevillef_2 | 169,767 | 4 |
nicopap | 158,936 | 4 |
nicopap_hoist | 6,527 | 4 |
nicopap_bitset | 5,197 | 4 |
snaketwix | 58,178 | 4 |
vec | 113,473 | 4 |
––––––– | ||
hashet | 2,144,092 | 14 |
manevillef_2 | 1,085,112 | 14 |
nicopap | 919,232 | 14 |
nicopap_hoist | 623,370 | 14 |
nicopap_bitset | 24,871 | 14 |
snaketwix | 118,899 | 14 |
vec | 2,141,100 | 14 |
Outrageously fast!
Going further
I didn’t explore faster possibilities, but @Verte
proposed an interesting solution, very similar to mine. Instead of using !windows.any([add_or_contains])
, they simply Add all the letters from the window to the bitset and count how many bits are set (this can be done in a single instruction with u32::count_ones
) If the number of bits is equal to the number of elements, it means all the elements are unique!
This is pretty good, and possibly more efficient than my solution. In my case, I early exit as soon as a duplicate is found. This translates in the generated assembly by a single jump instruction after each comparison. Due to branch prediction and pipeline invalidation, Verte’s solution might really be better.
The CPU has an instruction pipeline, it preloads the next few instructions it will execute. Problem is that when encountering a conditional jump instruction, the next few instructions become unknown since it’s impossible to know the value of the predicate before evaluating it. So if it didn’t load the correct instructions, it will need to clear the pipeline and reload the next instructions, causing a stall of a few cycles to a few hundred cycles depending on the context.
Just unconditionally adding up all the letters to the bit set, then acting on it fixes that problem, and enables the potential for vectorization (aka automatic SIMD instructions)
I didn’t test it yet, but I could expect a 4× speedup.