From 8ca8769a7777b8a68fcf331606ede475f37ad44b Mon Sep 17 00:00:00 2001 From: Kirill Sobakin Date: Mon, 16 Mar 2026 19:32:14 +0300 Subject: [PATCH] Ref: to API stat.ripe.net. Some fixes --- get-subnets.py | 175 ++++++++++++++++++++++--------------------------- 1 file changed, 78 insertions(+), 97 deletions(-) diff --git a/get-subnets.py b/get-subnets.py index ede98c9..d4a1cff 100755 --- a/get-subnets.py +++ b/get-subnets.py @@ -5,27 +5,24 @@ import urllib.request import os import shutil import json +import sys -BGP_TOOLS_URL = 'https://bgp.tools/table.txt' +RIPE_STAT_URL = 'https://stat.ripe.net/data/announced-prefixes/data.json?resource=AS{}' HEADERS = { 'User-Agent': 'itdog.info - hi@itdog.info' } -AS_FILE = 'AS.lst' IPv4_DIR = 'Subnets/IPv4' IPv6_DIR = 'Subnets/IPv6' -AS_META = ['32934','63293','54115','149642'] -AS_TWITTER = ['13414'] -AS_TELEGRAM = ['44907','59930','62014','62041','211157'] -AS_HETZNER = ['24940'] -AS_OVH = ['16276'] -AS_DIGITALOCEAN = ['14061'] +ASN_SERVICES = { + 'meta.lst': ['32934', '63293', '54115', '149642'], + 'twitter.lst': ['13414'], + 'hetzner.lst': ['24940'], + 'ovh.lst': ['16276'], + 'digitalocean.lst': ['14061'], +} -META = 'meta.lst' -TWITTER = 'twitter.lst' +ASN_TELEGRAM = ['44907', '59930', '62014', '62041', '211157'] TELEGRAM = 'telegram.lst' CLOUDFLARE = 'cloudflare.lst' -HETZNER = 'hetzner.lst' -OVH = 'ovh.lst' -DIGITALOCEAN = 'digitalocean.lst' CLOUDFRONT = 'cloudfront.lst' # From https://iplist.opencck.org/ @@ -52,28 +49,35 @@ GOOGLE_MEET_V6 = [ '2001:4860:4864:6::/64', ] -AWS_IP_RANGES_URL='https://ip-ranges.amazonaws.com/ip-ranges.json' - -subnet_list = [] +AWS_CIDR_URL='https://ip-ranges.amazonaws.com/ip-ranges.json' def subnet_summarization(subnet_list): subnets = [ipaddress.ip_network(subnet) for subnet in subnet_list] return list(ipaddress.collapse_addresses(subnets)) -def process_subnets(subnet_list, target_as): +def fetch_asn_prefixes(asn_list): ipv4_subnets = [] ipv6_subnets = [] - for subnet_str, as_number in subnet_list: + for asn in asn_list: + url = RIPE_STAT_URL.format(asn) + req = urllib.request.Request(url, headers=HEADERS) try: - subnet = ipaddress.ip_network(subnet_str) - if as_number in target_as: - if subnet.version == 4: - ipv4_subnets.append(subnet_str) - elif subnet.version == 6: - ipv6_subnets.append(subnet_str) - except ValueError: - print(f"Invalid subnet: {subnet_str}") + with urllib.request.urlopen(req) as response: + data = json.loads(response.read().decode('utf-8')) + for entry in data['data']['prefixes']: + prefix = entry['prefix'] + try: + network = ipaddress.ip_network(prefix) + if network.version == 4: + ipv4_subnets.append(prefix) + else: + ipv6_subnets.append(prefix) + except ValueError: + print(f"Invalid subnet: {prefix}") + sys.exit(1) + except Exception as e: + print(f"Error fetching AS{asn}: {e}") sys.exit(1) ipv4_merged = subnet_summarization(ipv4_subnets) @@ -85,60 +89,66 @@ def download_ready_subnets(url_v4, url_v6): ipv4_subnets = [] ipv6_subnets = [] - urls = [(url_v4, 4), (url_v6, 6)] + urls = [url_v4, url_v6] - for url, version in urls: + for url in urls: req = urllib.request.Request(url, headers=HEADERS) try: with urllib.request.urlopen(req) as response: - if response.status == 200: - subnets = response.read().decode('utf-8').splitlines() - for subnet_str in subnets: - try: - subnet = ipaddress.ip_network(subnet_str) - if subnet.version == 4: - ipv4_subnets.append(subnet_str) - elif subnet.version == 6: - ipv6_subnets.append(subnet_str) - except ValueError: - print(f"Invalid subnet: {subnet_str}") - sys.exit(1) + subnets = response.read().decode('utf-8').splitlines() + for subnet_str in subnets: + try: + subnet = ipaddress.ip_network(subnet_str) + if subnet.version == 4: + ipv4_subnets.append(subnet_str) + elif subnet.version == 6: + ipv6_subnets.append(subnet_str) + except ValueError: + print(f"Invalid subnet: {subnet_str}") + sys.exit(1) except Exception as e: print(f"Query error: {e}") + sys.exit(1) return ipv4_subnets, ipv6_subnets def download_ready_split_subnets(url): - req = urllib.request.Request(url) + req = urllib.request.Request(url, headers=HEADERS) with urllib.request.urlopen(req) as response: subnets = response.read().decode('utf-8').splitlines() - ipv4_subnets = [cidr for cidr in subnets if isinstance(ipaddress.ip_network(cidr, strict=False), ipaddress.IPv4Network)] - ipv6_subnets = [cidr for cidr in subnets if isinstance(ipaddress.ip_network(cidr, strict=False), ipaddress.IPv6Network)] - + ipv4_subnets = [] + ipv6_subnets = [] + for cidr in subnets: + network = ipaddress.ip_network(cidr, strict=False) + if network.version == 4: + ipv4_subnets.append(cidr) + else: + ipv6_subnets.append(cidr) + return ipv4_subnets, ipv6_subnets def download_aws_cloudfront_subnets(): ipv4_subnets = [] ipv6_subnets = [] - req = urllib.request.Request(AWS_IP_RANGES_URL, headers=HEADERS) + req = urllib.request.Request(AWS_CIDR_URL, headers=HEADERS) try: with urllib.request.urlopen(req) as response: - if response.status == 200: - data = json.loads(response.read().decode('utf-8')) - - for prefix in data.get('prefixes', []): - if prefix.get('service') == 'CLOUDFRONT': - ipv4_subnets.append(prefix['ip_prefix']) - - for prefix in data.get('ipv6_prefixes', []): - if prefix.get('service') == 'CLOUDFRONT': - ipv6_subnets.append(prefix['ipv6_prefix']) - + data = json.loads(response.read().decode('utf-8')) + + for prefix in data.get('prefixes', []): + if prefix.get('service') == 'CLOUDFRONT': + ipv4_subnets.append(prefix['ip_prefix']) + + for prefix in data.get('ipv6_prefixes', []): + if prefix.get('service') == 'CLOUDFRONT': + ipv6_subnets.append(prefix['ipv6_prefix']) + except Exception as e: print(f"Error downloading AWS CloudFront ranges: {e}") - + sys.exit(1) + return ipv4_subnets, ipv6_subnets def write_subnets_to_file(subnets, filename): @@ -152,38 +162,11 @@ def copy_file_legacy(src_filename): shutil.copy(src_filename, os.path.join(os.path.dirname(src_filename), new_filename)) if __name__ == '__main__': - request = urllib.request.Request(BGP_TOOLS_URL, headers=HEADERS) - - with urllib.request.urlopen(request) as response: - for line in response: - decoded_line = line.decode('utf-8').strip() - subnet, as_number = decoded_line.split() - subnet_list.append((subnet, as_number)) - - # Meta - ipv4_merged_meta, ipv6_merged_meta = process_subnets(subnet_list, AS_META) - write_subnets_to_file(ipv4_merged_meta, f'{IPv4_DIR}/{META}') - write_subnets_to_file(ipv6_merged_meta, f'{IPv6_DIR}/{META}') - - # Twitter - ipv4_merged_twitter, ipv6_merged_twitter = process_subnets(subnet_list, AS_TWITTER) - write_subnets_to_file(ipv4_merged_twitter, f'{IPv4_DIR}/{TWITTER}') - write_subnets_to_file(ipv6_merged_twitter, f'{IPv6_DIR}/{TWITTER}') - - # Hetzner - ipv4_merged_hetzner, ipv6_merged_hetzner = process_subnets(subnet_list, AS_HETZNER) - write_subnets_to_file(ipv4_merged_hetzner, f'{IPv4_DIR}/{HETZNER}') - write_subnets_to_file(ipv6_merged_hetzner, f'{IPv6_DIR}/{HETZNER}') - - # OVH - ipv4_merged_ovh, ipv6_merged_ovh = process_subnets(subnet_list, AS_OVH) - write_subnets_to_file(ipv4_merged_ovh, f'{IPv4_DIR}/{OVH}') - write_subnets_to_file(ipv6_merged_ovh, f'{IPv6_DIR}/{OVH}') - - # Digital Ocean - ipv4_merged_digitalocean, ipv6_merged_digitalocean = process_subnets(subnet_list, AS_DIGITALOCEAN) - write_subnets_to_file(ipv4_merged_digitalocean, f'{IPv4_DIR}/{DIGITALOCEAN}') - write_subnets_to_file(ipv6_merged_digitalocean, f'{IPv6_DIR}/{DIGITALOCEAN}') + # Services from ASN (meta, twitter, hetzner, ovh, digitalocean) + for filename, asn_list in ASN_SERVICES.items(): + ipv4_merged, ipv6_merged = fetch_asn_prefixes(asn_list) + write_subnets_to_file(ipv4_merged, f'{IPv4_DIR}/{filename}') + write_subnets_to_file(ipv6_merged, f'{IPv6_DIR}/{filename}') # Discord voice ipv4_discord, ipv6_discord = download_ready_subnets(DISCORD_VOICE_V4, DISCORD_VOICE_V6) @@ -192,7 +175,7 @@ if __name__ == '__main__': # Telegram ipv4_telegram_file, ipv6_telegram_file = download_ready_split_subnets(TELEGRAM_CIDR_URL) - ipv4_telegram_asn, ipv6_telegram_asn = process_subnets(subnet_list, AS_TELEGRAM) + ipv4_telegram_asn, ipv6_telegram_asn = fetch_asn_prefixes(ASN_TELEGRAM) ipv4_telegram = subnet_summarization(ipv4_telegram_file + [str(s) for s in ipv4_telegram_asn]) ipv6_telegram = subnet_summarization(ipv6_telegram_file + [str(s) for s in ipv6_telegram_asn]) write_subnets_to_file(ipv4_telegram, f'{IPv4_DIR}/{TELEGRAM}') @@ -212,10 +195,8 @@ if __name__ == '__main__': write_subnets_to_file(ipv4_cloudfront, f'{IPv4_DIR}/{CLOUDFRONT}') write_subnets_to_file(ipv6_cloudfront, f'{IPv6_DIR}/{CLOUDFRONT}') - # Legacy name - copy_file_legacy(f'{IPv4_DIR}/{META}') - copy_file_legacy(f'{IPv6_DIR}/{META}') - copy_file_legacy(f'{IPv4_DIR}/{TWITTER}') - copy_file_legacy(f'{IPv6_DIR}/{TWITTER}') - copy_file_legacy(f'{IPv4_DIR}/{DISCORD}') - copy_file_legacy(f'{IPv6_DIR}/{DISCORD}') \ No newline at end of file + # Legacy copies with capitalized names (e.g. meta.lst -> Meta.lst) + LEGACY_FILES = ['meta.lst', 'twitter.lst', 'discord.lst'] + for legacy_file in LEGACY_FILES: + copy_file_legacy(f'{IPv4_DIR}/{legacy_file}') + copy_file_legacy(f'{IPv6_DIR}/{legacy_file}') \ No newline at end of file