4
\$\begingroup\$

These are two Python scripts I wrote to convert a GIGANTIC text file of over 100MiB in size (the current version is 152.735MiB) to an SQLite3 database to use for geolocation, and the script that queries the converted SQLite3 database to geolocate a given IP address.

The text file is the data dump of IPFire location database, and is updated daily. The repository is here. I found the repository because of this, and if you are to run my script you need to read the COPYING document, because my script automatically downloads the latest version of the text file.

My scripts do many things, and I will list the things they do one by one.

First as mentioned above, the first script downloads the countries.txt and database.txt from the repository. It then decodes the bytes into string and splits the lines into a list of strings, in preparation for further processing.

The content of countries.txt is like this:

AD EU Andorra AE AS United Arab Emirates AF AS Afghanistan 

There are 254 such lines: each line represents a country, and each line has 3 fields; the fields are separated by tabs, and they correspond to country code, continent code and country name, respectively. There is no continent name field, so I searched online and created the following dictionary:

CONTINENT_CODES = { "AF": "Africa", "AN": "Antarctica", "AS": "Asia", "EU": "Europe", "NA": "North America", "OC": "Oceania", "SA": "South America", } 

I split the lines by tab and inserted the corresponding continent name field; after this process the data becomes like this:

[ ["AD", "Andorra", "EU", "Europe"], ["AE", "United Arab Emirates", "AS", "Asia"], ["AF", "Afghanistan", "AS", "Asia"] ] 

I then sorted the data (even though the data is already sorted), and created a dictionary where each key is the first element of the row, and value is the index of the row, to find the row index given country code, the data is like this:

{ "AD": 0, "AE": 1, "AF": 2 } 

So that I can use a one-byte integer reference for the country and save space.

The content of database.txt is much more complicated. It starts with comments, the comment lines start with a hash (#) sign, they are easy to ignore.

It is then followed by the block for autonomous systems, the lines are like this:

aut-num: AS1 name: LVLT-1 aut-num: AS2 name: UDEL-DCN aut-num: AS3 name: MIT-GATEWAYS 

There are many lines like these; 291,670 lines to be exact. Every two consecutive non-empty lines correspond to an autonomous system; I'll call these two lines entries. Every pair of entries is separated by an empty line.

Each data line starts with some Latin letters and dashes (-), followed by a colon (:), and then some spaces and then some arbitrary characters (you will see later). These lines represent key/value pairs; the key is the string before the first colon and the value is the string after the first colon and many spaces.

For autonomous systems, the lines correspond to the ASN and name of each autonomous system.

Splitting the data lines by colon once (because the value can contain colons), we can get the key and padded value; we can then call value.strip() to remove leading and trailing spaces. And I removed 'AS' part of each ASN then cast the string to integer, to save space.

But identifying when a entry starts and ends is trickier than you might think. I can't just use indexing and split on every third line, because the block is followed by the block for IPv4 networks which has variable line numbers for each entry, so I used a list to cache the current lines, set the cache to initially empty, on empty lines, if cache is not empty, collect the cached items and clear the cache, else add the current data to cache.

Then we get the block for IPv4 networks, the lines are like this:

net: 1.0.0.0/8 country: AU net: 1.0.0.0/24 country: AU aut-num: 13335 is-anycast: yes net: 1.0.1.0/24 country: CN 

There are 4,774,208 lines for the IPv4 block, followed by 1,255,713 lines for IPv6 block:

net: 2000::/12 aut-num: 3356 net: 2001::/32 aut-num: 6939 net: 2001:4:112::/48 aut-num: 112 

All consecutive non-empty lines are equivalent to dictionaries, as explained above. These dictionaries contain information about the slash notation of the networks, ASN of the network, country of the network, and additional information.

But there are many problems with these blocks. First and most obvious is that these dictionaries don't always have the same number of keys; multiple keys may be missing. The 'net' key is always present, all other keys can be missing.

A full dictionary has the following keys: "net", "aut-num", "country", "is-anonymous-proxy", "is-anycast", "is-satellite-provider", "drop". If any key is missing, it means the omitted value is falsy. For country and ASN, fill the missing value with None; the last 4 values are booleans, and if they appear the value is always 'yes', replace with True, else fill with False.

Then there are lot of overlapping network ranges and adjacent ranges with same data. I need to ignore the smaller range if the overlapping ranges have same data, and subtract the smaller range from the larger range and overwrite the larger range with the smaller range's data if the overlapping ranges have different data, and then merge adjacent ranges with same data...

See this question for more details, and though I have written a truly correct version to discretize the ranges when they don't intersect and when they overlap one is always subset of another (which is the case for the data), my code takes twice as much time as the accepted answer's solution to execute given the same data, so I used code from that answer to do this. And this is the only code in my scripts that isn't originally wrote by me.

And then the data contains many duplicate combinations of ASN, country and booleans. I replaced the country with the reference to the corresponding row in the countries table, packed the booleans into a 4-bit integer, and cast the triplet to tuple, so that I can use the tuple as a key. I then look up the table containing unique combinations of the triplets, check if the tuple is present and add it if not present, then replace the combination with the index of the tuple in the unique combinations table, to save space.

And then, because I merged the overlapping networks, I need to convert the ranges back to correct network slash notations. At first I just used binary decomposition of the number of addresses and just add a network for each power, however I got errors like this when I tried to verify the networks:

ValueError: 1.0.1.0/23 has host bits set 

So I find the index of the first bit that isn't set after the last set bit in the start of the range, and get the binary decomposition of the count, find the first power with value greater than or equal to the index using binary search (bisect_left). If found, add network slash notation and pop the value, else pop the value before insertion index and add all values between the popped value (not-including) and the index (including) to stack. I do this repeatedly until stack is empty.

And I wrote a script to geolocate a given IP address using the database I created with the first script. First of all, I have benchmarked SQLite3, and for my disk-based database, querying anything takes milliseconds because of I/O delay. This is unacceptable so I load everything into memory upfront. I only use SQLite3 to serialize the data.

And then I checked whether the IP address is reserved before the actual processing; an exception is raised if the IP is reserved. The range an IP belongs to is found using binary search on the start column: if the IP is less than or equal to the corresponding end IP, the IP belongs to the range, and the information about the network is retrieved using indexing on the corresponding third column, the result of the query is used to index NETWORK_INFORMATION list, and then the corresponding ASN and country is retrieved using indexing again.


Script to create the database

import gc import re import requests import sqlite3 import time from bisect import bisect_left from collections import deque from enum import Enum from ipaddress import IPv4Network, IPv6Network from pathlib import Path from typing import Any, Deque, Generator, Iterable, Iterator, List, Sequence, Tuple script_start = time.time() MAX_SQLITE_INT = 2**63 - 1 MAX_IPV4 = 2**32 - 1 MAX_IPV6 = 2**128 - 1 IPV6DIGITS = set("0123456789abcdef:") le255 = r"(25[0-5]|2[0-4]\d|[01]?\d\d?)" IPV4_PATTERN = re.compile(rf"^{le255}\.{le255}\.{le255}\.{le255}$") EMPTY = re.compile(r":?\b(?:0\b:?)+") FIELDS = re.compile(r"::?|[\da-f]{1,4}") JSON_TYPES = {True: "true", False: "false", None: "null"} KEYS_RENAME = [ ("net", "network"), ("aut-num", "ASN"), ("country", "country_code"), ("is-anonymous-proxy", "is_anonymous_proxy"), ("is-anycast", "is_anycast"), ("is-satellite-provider", "is_satellite_provider"), ("drop", "bad"), ] CONTINENT_CODES = { "AF": "Africa", "AN": "Antarctica", "AS": "Asia", "EU": "Europe", "NA": "North America", "OC": "Oceania", "SA": "South America", } DEFAULTS = { "network": None, "ASN": None, "country_code": None, "is_anonymous_proxy": False, "is_anycast": False, "is_satellite_provider": False, "bad": False, } COLUMNS_SQL = """ create table if not exists {}( network text primary key, network_info int not null ); """ Path("D:/network_guard/IPFire_locations.db").unlink(missing_ok=True) sqlite3.register_adapter(int, lambda x: hex(x) if x > MAX_SQLITE_INT else x) sqlite3.register_converter("integer", lambda b: int(b, 16 if b[:2] == b"0x" else 10)) conn = sqlite3.connect( "D:/network_guard/IPFire_locations.db", detect_types=sqlite3.PARSE_DECLTYPES ) cur = conn.cursor() data_loading_start = time.time() COUNTRY_CODES = requests.get( "https://git.ipfire.org/?p=location/location-database.git;a=blob_plain;f=countries.txt;hb=HEAD" ).content Path("D:/network_guard/countries.txt").write_bytes(COUNTRY_CODES) COUNTRY_CODES = [i.split("\t") for i in COUNTRY_CODES.decode().splitlines()] COUNTRY_CODES = sorted([(a, c, b, CONTINENT_CODES[b]) for a, b, c in COUNTRY_CODES]) COUNTRY_INDEX = {e[0]: i for i, e in enumerate(COUNTRY_CODES)} DATA = requests.get( "https://git.ipfire.org/?p=location/location-database.git;a=blob_plain;f=database.txt;hb=HEAD" ).content Path("D:/network_guard/database.txt").write_bytes(DATA) DATA = (i for i in DATA.decode().splitlines() if not i.startswith("#")) print(f"downloading data: {time.time() - data_loading_start:.3f} seconds") cur.execute( """ create table if not exists Country_Codes( country_code text primary key, country_name text not null, continent_code text not null, continent_name text not null ) """ ) cur.execute( """ create table if not exists Autonomous_Systems( ASN int primary key, entity text not null ) """ ) cur.execute( """ create table if not exists Network_Information( ASN int, country_code int, booleans int ) """ ) cur.execute(COLUMNS_SQL.format("IPv4")) cur.execute(COLUMNS_SQL.format("IPv6")) cur.executemany("insert into Country_Codes values (?, ?, ?, ?);", COUNTRY_CODES) conn.commit() KEY_ORDER = [ "ASN", "country_code", "is_anonymous_proxy", "is_anycast", "is_satellite_provider", "bad", ] def parse_ipv4(ip: str) -> int: assert (match := IPV4_PATTERN.match(ip)) a, b, c, d = match.groups() return (int(a) << 24) + (int(b) << 16) + (int(c) << 8) + int(d) def to_ipv4(n: int) -> str: assert 0 <= n <= MAX_IPV4 return f"{n >> 24 & 255}.{n >> 16 & 255}.{n >> 8 & 255}.{n & 255}" def preprocess_ipv6(ip: str) -> Tuple[list, bool, int]: assert 2 < len(ip) <= 39 and not set(ip) - IPV6DIGITS compressed = False terminals = False segments = ip.lower().split(":") if not segments[0]: assert not segments[1] terminals = 1 compressed = True segments = segments[2:] if not segments[-1]: assert not compressed and not segments[-2] terminals = 2 segments = segments[:-2] return segments, compressed, terminals def split_ipv6(ip: str) -> Deque[str]: segments, compressed, terminals = preprocess_ipv6(ip) chunks = deque() if terminals == 1: chunks.append("::") length = len(segments) minus_one = compressed or terminals or "" in segments if (minus_one and length == 8) or (not minus_one and length < 8) or length > 8: raise ValueError(f"{ip} is invalid") for seg in segments: if not seg: assert not compressed chunks.append("::") compressed = True else: assert len(seg) <= 4 chunks.append(seg) if terminals == 2: chunks.append("::") return chunks def parse_ipv6(ip: str) -> int: if ip == "::": return 0 segments = split_ipv6(ip) pos = 7 n = 0 for i, seg in enumerate(segments): if seg == "::": pos = len(segments) - i - 2 else: n += int(seg, 16) << pos * 16 pos -= 1 return n def to_ipv6(n: int, compress: bool = True) -> str: assert 0 <= n <= MAX_IPV6 ip = "{:039_x}".format(n).replace("_", ":") if compress: ip = ":".join(s.lstrip("0") if s != "0000" else "0" for s in ip.split(":")) longest = max(EMPTY.findall(ip), key=len, default="") if len(longest) > 3: ip = ip.replace(longest, "::", 1) return ip class IPv4(Enum): parser = parse_ipv4 formatter = to_ipv4 power = 32 maximum = MAX_IPV4 class IPv6(Enum): parser = parse_ipv6 formatter = to_ipv6 power = 128 maximum = MAX_IPV6 class IP_Address: def __str__(self) -> str: return self.string def __int__(self) -> int: return self.integer def __repr__(self) -> str: return f"IP_Address(integer={self.integer}, string='{self.string}', version={self.version.__name__})" def __init__(self, value: int | str, version: IPv4 | IPv6 = None) -> None: if isinstance(value, str): self.from_string(value, version) else: self.from_integer(value, version) self.maxpower = 32 if self.version == IPv4 else 128 self.hexadecimal = ( f"{self.integer:08x}" if version == IPv4 else f"{self.integer:032x}" ) self.pointer = None def from_string(self, value: str, version: IPv4 | IPv6) -> None: if not version: version = IPv4 if IPV4_PATTERN.match(value) else IPv6 self.integer = version.parser(value) self.string = value self.version = version def from_integer(self, value: int, version: IPv4 | IPv6) -> None: assert isinstance(value, int) if not version: version = IPv4 if 0 <= value <= MAX_IPV4 else IPv6 self.string = version.formatter(value) self.integer = value self.version = version def get_pointer(self) -> str: if not self.pointer: if self.version == IPv4: self.pointer = ".".join(self.string.split(".")[::-1]) + ".in-addr.arpa" else: self.pointer = ".".join(self.hexadecimal[::-1]) + ".ip6.arpa" return self.pointer def parse_network(network: str) -> Tuple[int, int]: start, slash = network.split("/") start = IP_Address(start) slash = int(slash) count = 2 ** (start.maxpower - slash) end = IP_Address(start.integer + count - 1) return start.integer, end.integer ASN = deque() IPV4_TABLE = deque() IPV6_TABLE = deque() NETWORK_INFO = {} def parse_entry(entry: Deque[tuple]) -> list: entry = dict(entry) entry = { new_key: v if (v := entry.get(key, DEFAULTS[new_key])) != "yes" else True for key, new_key in KEYS_RENAME } assert (network := entry["network"]) asn = entry["ASN"] if asn != None: entry["ASN"] = int(asn) info = [entry[k] for k in KEY_ORDER] info[1] = COUNTRY_INDEX.get(info[1]) number = sum(p for b, p in zip(info[2:], (8, 4, 2, 1)) if b) return [ *parse_network(network), NETWORK_INFO.setdefault((*info[:2], number), len(NETWORK_INFO)), ] data_processing_start = time.time() entry = deque() for line in DATA: if not line: if entry: if entry[0][0] == "aut-num": ASN.append((int(entry[0][1][2:]), entry[1][1])) else: assert entry[0][0] == "net" ip = entry[0][1].split("/")[0] table = IPV4_TABLE if IPV4_PATTERN.match(ip) else IPV6_TABLE table.append(parse_entry(entry)) entry = deque() else: key, val = line.split(":", 1) entry.append((key, val.strip())) print(f"converting data: {time.time() - data_processing_start:.3f} seconds") def discrete_segments(ranges: List[Tuple[int, int, Any]]) -> Generator: ranges.append((1e309, None, None)) stack = [] current = ranges[0][0] for start, end, data in ranges: while stack and stack[-1][0] < start: end2, data2 = stack.pop() if current <= end2: yield current, end2, data2 current = end2 + 1 if stack and current < start: yield current, start - 1, stack[-1][1] current = start if not stack or stack[-1][1] != data or end > stack[-1][0]: stack.append((end, data)) def merge(segments: Iterator) -> Generator: start, end, data = next(segments) for start2, end2, data2 in segments: if data == data2 and end + 1 == start2: end = end2 else: yield start, end, data start, end, data = start2, end2, data2 yield start, end, data def discretize(ranges: List[Tuple[int, int, Any]]) -> List[Tuple[int, int, Any]]: if not ranges: return [] result = list(merge(discrete_segments(ranges))) ranges.pop() return result def binary_decompose(n: int, power: int) -> List[int]: return [ power - i for i, d in enumerate(bin(n).removeprefix("0b")[::-1]) if d == "1" ][::-1] def to_network(item: Sequence, version: IPv4 | IPv6 = IPv4) -> Generator[tuple, None, None]: start, end, data = item power = version.power.value formatter = version.formatter powers = binary_decompose(end - start + 1, power) while powers: min_slash = f"{start:0{power}b}".rindex("1") + 1 index = bisect_left(powers, min_slash) if index == (l := len(powers)): for i, n in enumerate( range(powers.pop(-1) + 1, min_slash + 1), start=l - 1 ): powers.insert(i, n) continue yield (f"{formatter(start)}/{(slash := powers.pop(index))}", data) start += 1 << power - slash data_analyzing_start = time.time() ANALYZED_IPV4 = [] for networks in map(to_network, merge(discrete_segments(IPV4_TABLE))): ANALYZED_IPV4.extend(networks) del IPV4_TABLE gc.collect() IPV4_TABLE = ANALYZED_IPV4 ANALYZED_IPV6 = [] for networks in map( lambda x: to_network(x, IPv6), merge(discrete_segments(IPV6_TABLE)) ): ANALYZED_IPV6.extend(networks) del IPV6_TABLE gc.collect() IPV6_TABLE = ANALYZED_IPV6 NETWORK_INFO = list(NETWORK_INFO) print(f"analyzing data: {time.time() - data_analyzing_start:.3f} seconds") data_saving_start = time.time() cur.executemany("insert into Autonomous_Systems values (?, ?);", ASN) cur.executemany(f"insert into IPv4 values ({', '.join(['?']*2)});", IPV4_TABLE) cur.executemany(f"insert into IPv6 values ({', '.join(['?']*2)});", IPV6_TABLE) cur.executemany( f"insert into Network_Information values ({', '.join(['?']*3)});", NETWORK_INFO ) conn.commit() print(f"SQLite3 serializing: {time.time() - data_saving_start:.3f} seconds") def json_repr(row: Sequence) -> str: items = deque() for e in row: if isinstance(e, (int, float)) and not isinstance(e, bool): items.append(f"{e}") elif isinstance(e, str): item = e.replace('"', '\\"') items.append(f'"{item}"') else: items.append(JSON_TYPES[e]) return "[" + ", ".join(items) + "]" def pretty_table(table: Iterable[Sequence]) -> str: return "[\n" + "\n".join(f"\t{json_repr(row)}," for row in table)[:-1] + "\n]" data_serializing_start = time.time() Path("D:/network_guard/IPv4_table.json").write_text(pretty_table(IPV4_TABLE)) Path("D:/network_guard/IPv6_table.json").write_text(pretty_table(IPV6_TABLE)) Path("D:/network_guard/Autonomous_Systems.json").write_text( pretty_table(ASN), encoding="utf8" ) Path("D:/network_guard/Country_Codes.json").write_text( pretty_table(COUNTRY_CODES), encoding="utf8" ) Path("D:/network_guard/Network_Information.json").write_text(pretty_table(NETWORK_INFO)) print(f"JSON serializing: {time.time() - data_serializing_start:.3f} seconds") data_verifying_start = time.time() IPV4_NETWORKS = [IPv4Network(a) for a, _ in IPV4_TABLE] IPV6_NETWORKS = [IPv6Network(a) for a, _ in IPV6_TABLE] for a, b in zip(IPV4_NETWORKS, IPV4_NETWORKS[1:]): assert a.network_address + a.num_addresses <= b.network_address for a, b in zip(IPV6_NETWORKS, IPV6_NETWORKS[1:]): assert a.network_address + a.num_addresses <= b.network_address print(f"verifying data: {time.time() - data_verifying_start:.3f} seconds") print(f"full script: {time.time() - script_start:.3f} seconds") 

Script to query the database

import gc import json import re import sqlite3 from bisect import bisect from collections import deque from enum import Enum from typing import Deque, Tuple MAX_IPV4 = 2**32 - 1 MAX_IPV6 = 2**128 - 1 MAX_SQLITE_INT = 2**63 - 1 le255 = r"(25[0-5]|2[0-4]\d|[01]?\d\d?)" IPV4_PATTERN = re.compile(rf"^{le255}\.{le255}\.{le255}\.{le255}$") IPV6DIGITS = set("0123456789abcdef:") EMPTY = re.compile(r":?\b(?:0\b:?)+") FIELDS = re.compile(r"::?|[\da-f]{1,4}") SUAB4 = [ (0x00000000, 0x00FFFFFF), (0x0A000000, 0x0AFFFFFF), (0x64400000, 0x647FFFFF), (0x7F000000, 0x7FFFFFFF), (0xA9FE0000, 0xA9FEFFFF), (0xAC100000, 0xAC1FFFFF), (0xC0000000, 0xC00000FF), (0xC0000200, 0xC00002FF), (0xC0586300, 0xC05863FF), (0xC0A80000, 0xC0A8FFFF), (0xC6120000, 0xC613FFFF), (0xC6336400, 0xC63364FF), (0xCB007100, 0xCB0071FF), (0xE0000000, 0xFFFFFFFF), ] SUAB6 = [ ( 0x0, 0x1 ), ( 0xFFFF << 0x20, (1 << 0x30) - 1 ), ( 0xFFFF << 0x30, (0xFFFF0000 + 1 << 0x20) - 1 ), ( 0x64FF9B << 0x60, ((0x64FF9B << 0x40) + 1 << 0x20) - 1 ), ( 0x64FF9B0001 << 0x50, (0x327FCD8000 + 1 << 0x51) - 1 ), ( 0x1 << 0x78, ((0x1 << 0x38) + 1 << 0x40) - 1 ), ( 0x2001 << 0x70, (0x20010000 + 1 << 0x60) - 1 ), ( 0x1000801 << 0x65, (0x2001002 + 1 << 0x64) - 1 ), ( 0x40021B7 << 0x63, (0x20010DB8 + 1 << 0x60) - 1 ), ( 0x1001 << 0x71, (0x2002 + 1 << 0x70) - 1 ), ( 0x3F << 0x7A, (0x7E + 1 << 0x79) - 1 ), ( 0x1FD << 0x77, (0x3FA + 1 << 0x76) - 1 ), ( 0xFF << 0x78, (1 << 0x80) - 1 ), ] SUAB6_STARTS, SUAB6_ENDS = zip(*SUAB6) SUAB6_FIRST_FIELDS = {f for s, e in SUAB6 for f in range(s >> 112, (e >> 112) + 1)} SUAB4_STARTS, SUAB4_ENDS = zip(*SUAB4) SUAB4_FIRST_BYTES = {b for s, e in SUAB4 for b in range(s >> 24, (e >> 24) + 1)} COUNTRY_FIELDS = ("Country_Code", "Country_Name", "Continent_Code", "Continent_Name") FLAGS = ("is_anonymous_proxy", "is_anycast", "is_satellite_provider", "bad") def is_reserved_ipv4(ip: int) -> bool: if ip >> 24 not in SUAB4_FIRST_BYTES: return False i = bisect(SUAB4_STARTS, ip) - 1 return SUAB4_STARTS[i] <= ip <= SUAB4_ENDS[i] def is_reserved_ipv6(ip: int) -> bool: if ip >> 112 not in SUAB6_FIRST_FIELDS: return False i = bisect(SUAB6_STARTS, ip) - 1 return SUAB6_STARTS[i] <= ip <= SUAB6_ENDS[i] def parse_ipv4(ip: str) -> int: assert (match := IPV4_PATTERN.match(ip)) a, b, c, d = match.groups() return (int(a) << 24) + (int(b) << 16) + (int(c) << 8) + int(d) def to_ipv4(n: int) -> str: assert 0 <= n <= MAX_IPV4 return f"{n >> 24 & 255}.{n >> 16 & 255}.{n >> 8 & 255}.{n & 255}" def preprocess_ipv6(ip: str) -> Tuple[list, bool, int]: assert 2 < len(ip) <= 39 and not set(ip) - IPV6DIGITS compressed = False terminals = False segments = ip.lower().split(":") if not segments[0]: assert not segments[1] terminals = 1 compressed = True segments = segments[2:] if not segments[-1]: assert not compressed and not segments[-2] terminals = 2 segments = segments[:-2] return segments, compressed, terminals def split_ipv6(ip: str) -> Deque[str]: segments, compressed, terminals = preprocess_ipv6(ip) chunks = deque() if terminals == 1: chunks.append("::") length = len(segments) minus_one = compressed or terminals or "" in segments if ( (minus_one and length == 8) or (not minus_one and length < 8) or length > 8 ): raise ValueError(f"{ip} is invalid") for seg in segments: if not seg: assert not compressed chunks.append("::") compressed = True else: assert len(seg) <= 4 chunks.append(seg) if terminals == 2: chunks.append("::") return chunks def parse_ipv6(ip: str) -> int: if ip == "::": return 0 segments = split_ipv6(ip) pos = 7 n = 0 for i, seg in enumerate(segments): if seg == "::": pos = len(segments) - i - 2 else: n += int(seg, 16) << pos * 16 pos -= 1 return n def to_ipv6(n: int, compress: bool = True) -> str: assert 0 <= n <= MAX_IPV6 ip = "{:039_x}".format(n).replace("_", ":") if compress: ip = ":".join(s.lstrip("0") if s != "0000" else "0" for s in ip.split(":")) longest = max(EMPTY.findall(ip), key=len, default="") if len(longest) > 3: ip = ip.replace(longest, "::", 1) return ip def parse_ipv4_network(data: Tuple[str, int]) -> Tuple[int]: network, inforef = data start, slash = network.split("/") start = parse_ipv4(start) return start, start + (1 << 32 - int(slash)) - 1, inforef def parse_ipv6_network(data: Tuple[str, int]) -> Tuple[int]: network, inforef = data start, slash = network.split("/") start = parse_ipv6(start) return start, start + (1 << 128 - int(slash)) - 1, inforef sqlite3.register_adapter(int, lambda x: hex(x) if x > MAX_SQLITE_INT else x) sqlite3.register_converter("integer", lambda b: int(b, 16 if b[:2] == b"0x" else 10)) conn = sqlite3.connect( "D:/network_guard/IPFire_locations.db", detect_types=sqlite3.PARSE_DECLTYPES ) cur = conn.cursor() cur.execute("select * from Country_Codes;") COUNTRIES = cur.fetchall() cur.execute("select * from Autonomous_Systems;") AUTONOMOUS_SYSTEMS = dict(cur.fetchall()) cur.execute("select * from Network_Information;") NETWORK_INFORMATION = cur.fetchall() cur.execute("select * from IPv4;") IPV4_STARTS, IPV4_ENDS, IPV4_INFOREFS = zip(*map(parse_ipv4_network, cur.fetchall())) cur.execute("select * from IPv6;") IPV6_STARTS, IPV6_ENDS, IPV6_INFOREFS = zip(*map(parse_ipv6_network, cur.fetchall())) gc.collect() def repr_ipv4(integer: int, string: str) -> dict: return { "string": string, "decimal": integer, "hexadecimal": f"{integer:08x}", "binary": f"{integer:032b}", } def repr_ipv6(integer: int, string: str) -> dict: return { "string": string, "decimal": integer, "hexadecimal": "{:039_x}".format(integer), "binary": f"{integer:0128b}", } def parse_network_info(index: int, reference: list) -> dict: asn, country, flags = NETWORK_INFORMATION[reference[index]] if asn is not None: asn = {"ASHandle": f"AS{asn}", "entity": AUTONOMOUS_SYSTEMS[asn], "number": asn} if country is not None: country = dict(zip(COUNTRY_FIELDS, COUNTRIES[country])) return { "Autonomous_System": asn, "Country": country, "Flags": dict(zip(FLAGS, (bool(flags & i) for i in (8, 4, 2, 1)))), } def get_ipv4_info(ip: int) -> dict: if is_reserved_ipv4(ip): raise ValueError(f"{ip} is a reserved IPv4 address") i = bisect(IPV4_STARTS, ip) - 1 if not (start := IPV4_STARTS[i]) <= ip <= (end := IPV4_ENDS[i]): return None return to_network(start, end, IPv4) | parse_network_info(i, IPV4_INFOREFS) def get_ipv6_info(ip: int) -> dict: if is_reserved_ipv6(ip): raise ValueError(f"{ip} is a reserved IPv6 address") i = bisect(IPV6_STARTS, ip) - 1 if not (start := IPV6_STARTS[i]) <= ip <= (end := IPV6_ENDS[i]): return None return to_network(start, end, IPv6) | parse_network_info(i, IPV6_INFOREFS) class IPv4(Enum): parser = parse_ipv4 formatter = to_ipv4 power = 32 maximum = MAX_IPV4 query = get_ipv4_info repr = repr_ipv4 class IPv6(Enum): parser = parse_ipv6 formatter = to_ipv6 power = 128 maximum = MAX_IPV6 query = get_ipv6_info repr = repr_ipv6 def to_network(start: int, end: int, version: IPv4 | IPv6) -> dict: start_ip = version.formatter(start) count = end - start + 1 slash = version.power.value - (count).bit_length() + 1 mask = ((1 << slash) - 1) << (version.power.value - slash) return { "network": f"{start_ip}/{slash}", "version": version.__name__, "start": version.repr(start, start_ip), "end": version.repr(end, version.formatter(end)), "count": count, "slash": slash, "netmask": version.repr(mask, version.formatter(mask)), } def get_ipinfo(ip: int | str, version: IPv4 | IPv6 = None) -> dict: if version: if isinstance(ip, str): ip = version.parser(ip) assert 0 <= ip <= version.maximum.value return version.query(ip) if isinstance(ip, str): version = IPv4 if IPV4_PATTERN.match(ip) else IPv6 elif ip < 0 or ip > MAX_IPV6: raise ValueError(f"{ip} is not a valid IP address") else: version = IPv4 if ip <= MAX_IPV4 else IPv6 return get_ipinfo(ip, version) if __name__ == '__main__': print(json.dumps(get_ipinfo('142.250.65.174'), indent=4)) print(json.dumps(get_ipinfo('2404:6800:4003:c03::88'), indent=4)) 

Timing

downloading data: 9.618 seconds converting data: 36.807 seconds analyzing data: 11.901 seconds SQLite3 serializing: 5.419 seconds JSON serializing: 2.775 seconds verifying data: 19.156 seconds full script: 86.491 seconds 

Example output

{ "network": "142.250.0.0/15", "version": "IPv4", "start": { "string": "142.250.0.0", "decimal": 2398748672, "hexadecimal": "8efa0000", "binary": "10001110111110100000000000000000" }, "end": { "string": "142.251.255.255", "decimal": 2398879743, "hexadecimal": "8efbffff", "binary": "10001110111110111111111111111111" }, "count": 131072, "slash": 15, "netmask": { "string": "255.254.0.0", "decimal": 4294836224, "hexadecimal": "fffe0000", "binary": "11111111111111100000000000000000" }, "Autonomous_System": { "ASHandle": "AS15169", "entity": "GOOGLE", "number": 15169 }, "Country": { "Country_Code": "US", "Country_Name": "United States of America", "Continent_Code": "NA", "Continent_Name": "North America" }, "Flags": { "is_anonymous_proxy": false, "is_anycast": false, "is_satellite_provider": false, "bad": false } } { "network": "2404:6800::/32", "version": "IPv6", "start": { "string": "2404:6800::", "decimal": 47875086406289890508775266669543030784, "hexadecimal": "2404_6800_0000_0000_0000_0000_0000_0000", "binary": "00100100000001000110100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" }, "end": { "string": "2404:6800:ffff:ffff:ffff:ffff:ffff:ffff", "decimal": 47875086485518053023039604263086981119, "hexadecimal": "2404_6800_ffff_ffff_ffff_ffff_ffff_ffff", "binary": "00100100000001000110100000000000111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111" }, "count": 79228162514264337593543950336, "slash": 32, "netmask": { "string": "ffff:ffff::", "decimal": 340282366841710300949110269838224261120, "hexadecimal": "ffff_ffff_0000_0000_0000_0000_0000_0000", "binary": "11111111111111111111111111111111000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" }, "Autonomous_System": { "ASHandle": "AS15169", "entity": "GOOGLE", "number": 15169 }, "Country": { "Country_Code": "AU", "Country_Name": "Australia", "Continent_Code": "OC", "Continent_Name": "Oceania" }, "Flags": { "is_anonymous_proxy": false, "is_anycast": false, "is_satellite_provider": false, "bad": false } } 

I have checked the memory usage of the interpreter in task manager. Right before running the code to create the database, it uses around 193MiB RAM. Right after the script is completed and without doing anything else, the interpreter uses around 1223MiB RAM, so my data takes about 1030MiB. That is more than 1GiB, and this is after my compression.

I closed the interpreter and opened another section; It uses around 60MiB RAM upon start. I pasted the code to query the database and run the code; the interpreter uses around 360MiB RAM after that, so loading the data from the SQLite3 database takes around 300MiB RAM.

I didn't use the IPv4Network and IPv6Network classes from the ipaddress module, because as you can clearly see from my timing, converting the data to the classes is extremely slow. It takes around 15 seconds, but my code takes around 6 seconds.

There are other IP geolocation database text dumps that I have access to, and my code can be trivially modified to convert those files as well.

How can I improve my code?

\$\endgroup\$

    1 Answer 1

    6
    \$\begingroup\$

    Here are a few suggestions that immediately come to mind. First, concerning the database creation script:

    1. You specify 'D:/network_guard/IPFire_locations.db' in multiple locations. Better would be to define a constant such as DB_PATH and use that instead.

    2. You delete the database file but then when you go to create the database tables you use the "if exits" clause, which seems superfluous.

    3. You are reading the entire input text file into variable DATA followed by writing it out to disk and its processing for loading the database. You could save memory by getting the URL contents line by line. For example:

      with Path("D:/network_guard/database.txt").open("w") as database_txt: resp = requests.get( "https://git.ipfire.org/?p=location/location-database.git;a=blob_plain;f=database.txt;hb=HEAD", stream=True ) entry = deque() for line in resp.iter_lines(): line = line.decode() print(line, file=database_txt) if line.startswith("#"): continue if not line: if entry: if entry[0][0] == "aut-num": ASN.append((int(entry[0][1][2:]), entry[1][1])) else: assert entry[0][0] == "net" ip = entry[0][1].split("/")[0] table = IPV4_TABLE if IPV4_PATTERN.match(ip) else IPV6_TABLE table.append(parse_entry(entry)) entry = deque() else: key, val = line.split(":", 1) entry.append((key, val.strip())) 
    4. In function parse_ipv4 you have assert (match := IPV4_PATTERN.match(ip)). If you should compile with optimization, the assert expression will not be evaluated and match will be undefined. I would recommend that you change this to:

       match = IPV4_PATTERN.match(ip) assert match 

    As for the script to query the database:

    1. parse_ipv4 should be changed as described above.

    2. Function get_ipinfo uses unnecessary recursion that neither improves efficiency not clarity. Change to:

      def get_ipinfo(ip: int | str, version: IPv4 | IPv6 = None) -> dict: if not version: if isinstance(ip, str): version = IPv4 if IPV4_PATTERN.match(ip) else IPv6 elif ip < 0 or ip > MAX_IPV6: raise ValueError(f"{ip} is not a valid IP address") else: version = IPv4 if ip <= MAX_IPV4 else IPv6 if isinstance(ip, str): ip = version.parser(ip) assert 0 <= ip <= version.maximum.value return version.query(ip) 

    General Remarks

    1. You have functions (such as parse_ipv4, to_ipv4, parse_ipv6, to_ipv6 etc.) and code fragments that are common to both scripts and should be placed in a module that can be imported. For example, you might take the code fragment you use to connect to the database and place them in a function connect:

      def connect(): sqlite3.register_adapter(int, lambda x: hex(x) if x > MAX_SQLITE_INT else x) sqlite3.register_converter("integer", lambda b: int(b, 16 if b[:2] == b"0x" else 10)) return sqlite3.connect( "D:/network_guard/IPFire_locations.db", detect_types=sqlite3.PARSE_DECLTYPES ) 
    2. In function parse_ipv4 you must decide whether you are assuming the input is correct and you just need to parse the input string into its 4 decimal numbers or whether you want to rigorously validate the input string because it might be invalid. If the former, then you could use a more efficient method such as splitting the input on '.'. If the latter, then you should not use an assert statement to check whether the regex succeeded in matching the input. As I have mentioned, if you compile with optimization, then the assert statement will never be executed. Instead you should explicitly check match with an if statement and raise, for example, a ValueError if match is None. You should not in general be using assert statements to validate user input. Also, in a regex \d is not equivalent to [0-9] unless the re.ASCII flag is set. You should be using [0-9] instead. Since you are not using the ipaddress module, it would be less confusing if you did not import it.

    3. In your script to create the database, you have:

      table = IPV4_TABLE if IPV4_PATTERN.match(ip) else IPV6_TABLE 

      If you assume that the input data is valid, I would think this could be more cheaply calculated with:

      table = IPV4_TABLE if '.' in ip else IPV6_TABLE 
    4. You are using a deque as your "cache" but only appending items to it. For what it's worth, I benchmarked the repeated creation of a deque followed by appending N tuples of two integers to it and then did the same for a list. I determined that on my desktop running Python 3.8.5 that for small values of N (<= 10) that the deque is slower (for N == 4, it is 12% slower). As N increases beyond 10 then the deque starts to outperform the list (for N == 1_000_000, the list is 15% slower).

    5. Comments in your code would aid readability and maintainability.

    \$\endgroup\$
    1
    • \$\begingroup\$Correction, I didn't sort the contents of database.txt, I sorted the contents of countries.txt. database.txt is the huge file containing the actual data, countries.txt contains the country code reference used in database.txt.\$\endgroup\$CommentedJul 30, 2023 at 4:15

    Start asking to get answers

    Find the answer to your question by asking.

    Ask question

    Explore related questions

    See similar questions with these tags.