This is the fourth iteration of a Python script I wrote that splits overlapping ranges, and this is the fastest version I have wrote so far, and also a version that works for all inputs. I have done rigorous testing using randomly generated samples and compared the output against the output of a method that is known to be correct, and there are no errors.
I have a literally millions of triplets, the first two elements of each are integers, the third is of arbitrary type. The first number is always no larger than the second number, each triplet represents a integer range (inclusive) with associated data, the two numbers are start and end of the range, and the third element is the data that is associated with the range.
Each triplet (start, end, data)
is equivalent to a dictionary, where the keys are all integers from start
to end
(including both), and the value for all the keys is data
, for example, (42, 50, 'A')
is equivalent to {42: 'A', 43: 'A', 44: 'A', 45: 'A', 46: 'A', 47: 'A', 48: 'A', 49: 'A', 50: 'A'}
.
Now the ranges often overlap, and I wish to split the ranges so that all ranges are discrete and all ranges only have one datum which is the latest, and the number of ranges is minimal.
It is important to note if two ranges (s1, e1, d1)
and (s2, e2, d2)
overlap, one is always completely inside the other, one is always a complete subset, either s1 <= s2 <= e2 < e1
or s2 <= s1 <= e1 < e2
, they never intersect in my data.
Two overlapping ranges can share the same data, in this case the smaller range needs to be ignored because there is no new data. For example, (48, 49, 'A')
is {48: 'A', 49: 'A'}
and it is already contained in my previous example, there is no data change.
If they overlap but have different data, then the smaller range's data overwrites the larger range, the data of the overlap is inherited from the newer one, the overlapping portion of the larger range is deleted, this is similar to dictionary update. For example, (43, 45, 'B')
is {43: 'B', 44: 'B', 45: 'B'}
, after operation with the first example, the data would be {42: 'A', 43: 'B', 44: 'B', 45: 'B', 46: 'A', 47: 'A', 48: 'A', 49: 'A', 50: 'A'}
, and the triplet representation is [(42, 42, 'A'), (43, 45, 'B'), (46, 50, 'A')]
.
And finally sometimes the start of one range is the end of the previous range plus one, and both share the same data, they need to be joined, the previous range needs to be extended to the end of the new range, for example, given another range (51, 60, 'A')
, the expected output for all the ranges given should be [(42, 42, 'A'), (43, 45, 'B'), (46, 60, 'A')]
.
Code
from collections import deque class Range: processed = deque() stack = deque() def __init__(self, ini, fin, data) -> None: self.ini = ini self.fin = fin self.data = data def __repr__(self): return f'Range({self.ini}, {self.fin}, {self.data})' def cover(self, fin): if fin <= self.fin: return True def join(self, ini, fin): if ini <= self.fin + 1: self.fin = fin return True def prepend(self, ini): if self.ini < ini: Range.processed.append(Range(self.ini, ini - 1, self.data)) def climb(self, fin): if self.ini < (ini := fin + 1) <= self.fin: self.ini = ini def update(self, ini, fin, data): if self.ini == ini and self.fin == fin: self.data = data return True def process(self, ini, fin, data): ignore = past = False if self.data == data: ignore = self.cover(fin) or self.join(ini, fin) elif ini <= self.fin and not (ignore := self.update(ini, fin, data)): self.prepend(ini) if not ignore: if ini > self.fin: Range.processed.append(self) Range.stack.pop() past = True self.climb(fin) return not ignore, past def discretize(ranges): Range.processed.clear() Range.stack.clear() for ini, fin, data in ranges: if not Range.stack: Range.stack.append(Range(ini, fin, data)) else: append, past = Range.stack[-1].process(ini, fin, data) while past and Range.stack: _, past = Range.stack[-1].process(ini, fin, data) if append: Range.stack.append(Range(ini, fin, data)) Range.processed.extend(reversed(Range.stack)) return Range.processed
I did hundred of tests using examples generated using this and compared the result against the method from the answer linked above, and the results were all correct, the script truly does exactly what I intend and there are no bugs.
But it is very important to note the input must be sorted by the start ascending and end descending before processing using my method, though in my case the input is always pre-sorted, and it won't work correctly if the ranges intersect instead of one being subset of the other, this is intentional because in my data this never happens, so I optimized that out.
I did try using Interval Tree as suggested by this answer, but it is extremely slow and doesn't suit my needs:
data = [ (3, 2789, 2681), (27, 2349, 2004), (29, 1790, 2072), (32, 1507, 1173), (35, 768, 3453), (63, 222, 2108), (341, 501, 2938), (378, 444, 258), (870, 888, 1364), (1684, 1693, 880) ]
In [73]: %timeit merge_rows(data) 1.24 ms ± 9.43 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
My method is by far faster, in fact all my previous methods are much faster than that, and the current one is the fastest:
In [76]: %timeit discretize(data) 23.3 µs ± 525 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
Given this:
sample = [(2518812, 3931503981, 1222922854), (8283077, 3890282434, 2360189310), (21368257, 3571587114, 2651809994), (24571285, 2875098555, 1506778869), (40433776, 1751972813, 4162881198), (60822897, 1413187009, 921758666), (73435921, 1245663774, 3798627995), (169616863, 530686872, 1316302364), (273065748, 432363205, 3428529648), (291266380, 409571968, 4206669862), (558988640, 574048639, 4272236023), (619240775, 631101725, 1910576108), (727687992, 756241696, 3095538347), (825700858, 866696475, 2647033502), (853058882, 854026525, 2456782176), (967641241, 1001077664, 3556790281)]
The performance of my code is:
In [80]: %timeit discretize(sample) 36.8 µs ± 790 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
The code from the linked Stack Overflow answer is faster than mine, but it looks extremely ugly and I don't like it, it took me a while to understand how it works, and I have tried to optimize and refactor it but found it hard to do, so I wrote another version, but it somehow is a little slower:
In [81]: %timeit descretize(sample) 28.1 µs ± 1.06 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
How can I make my code more efficient?
(I used ini for start and fin for end, to save characters thus making the code more concise, they are from Latin, initus is Latin for start and where you get __init__
, finis is Latin for end and where you get "finish")
Update
It turns out my original code doesn't work properly, if a sub range shares the same data as the parent range, it should be ignored, but if the sub range is separated from the parent range by some other range, sometimes the sub range isn't properly ignored.
I only found this out after I have written a clever method to generate a big number of overlapping ranges, as answers are posted I am not allowed to edit the code.
Also the answer by @booboo doesn't work properly for the test cases generated by my new method, I have tested extensively using both the original method and the method optimized by me, they both fail for many test cases, and somehow frequently eats literally Gibibytes of RAM (I have 16 GiB and it uses 8GiB+ in a short time), freezing my computer and I had to reboot my computer.
So there aren't answers that are bounty worthy. If there are no new answers then the bounty has to be wasted.
The function I wrote to generate test cases:
import random def make_sample(num, lim, dat): if num * 2 > lim: lim = num * 2 + random.randrange(num) stack = [sorted(random.choices(range(lim), k=2*num))] ranges = [] while stack: top = stack.pop(0) l = len(top) if l == 2: ranges.append(top) elif l == 4: ranges.extend([top[:2], top[2:]]) else: choice = random.randrange(3) if not choice: ranges.append([top[0], top[-1]]) stack.append(top[1:-1]) elif choice == 1: i = l//3 i += i % 2 index = random.randrange(i, 2*i, 2) ranges.append(top[index:index+2]) stack.extend([top[:index], top[index+2:]]) else: index = random.randrange(2, l - 2, 2) stack.extend([top[:index], top[index:]]) ranges.sort(key=lambda x: (x[0], -x[1])) return [(a, b, random.randrange(dat)) for a, b in ranges]
New answers must generate the correct output for make_sample(2048, 65536, 32)
, the correct output can be verified by comparing against the output of descritize
function found in the answer on Stack Overflow linked at the top of the question in the first paragraph.
And it also needs to be comparable in performance to it, and of course for it to be of comparable performance it must not eat up RAM and freeze the computer.
I edited the code that generates the samples to address bugs in the logic.
The code generates invalid input because the numbers are required to be unique, [(0, 1, 'A'), (0, 1, 'B')]
is invalid, but cases like this can be generated by my code.
I first thought random.choices
guarantees the choices to be unique if sample size is less than population size, I was wrong, it is just highly likely.
I fixed the code so that all numbers are guaranteed to be unique, and invalid test cases won't generated.
I edited the code but I haven't tested the functions thoroughly yet, my edit was immediately rolled back, though I would argue the edited code isn't what I wanted reviewed, this is an error by the one who made the reversion, though I won't edit the code again lest it be rolled back again.
And yes, the input is guaranteed to be sorted in ascending order, as I mentioned in my question multiple times and demonstrated with the code that generates test cases.
Lastly I am experiencing some "technical difficulties", more specifically my computer doesn't have internet connections right now because I just broke my modem (I just bought a new one online), and I am using my phone right now (which doesn't have reliable VPN connection now and in China Stack Exchange is throttled to nearly inaccessible), so updates will be much less frequent as of now.
But I will try to award the bounty within the bounty grace period if I found a bounty-worthy answer (if new answers are posted) or if I determined @booboo's method to be correct. That is, if I managed to visit this site at all, of course.
It is summer and my apartment building has experienced yet another blackout, my computer is a desktop and I can't even boot my computer to test the code. I was out before and now my phone is running out of juice, I don't know how long the blackout will last so there is a chance I won't be able to award the bounty.
My method is by far faster [than using Interval Trees]
did you establish that using the data presented above, or millions of triplets?\$\endgroup\$data
andsample
are in ascending order with respect to their first elements. Is this guaranteed?\$\endgroup\$