#!/usr/bin/env python3 """ generate_nft_blacklist.py Reads prefixes from a file or stdin, aggregates them and writes nftables config. Uses named sets for efficient blacklist management. Usage: git clone https://github.com/C24Be/AS_Network_List.git generate_nft_blacklist.py ./AS_Network_List/blacklists/blacklist.txt nft_bl.conf cp nft_bl.conf /etc/nftables.d/ systemctl restart nftables """ import sys from ipaddress import ip_network, collapse_addresses from pathlib import Path from datetime import datetime def read_lines(path_or_dash): if path_or_dash == "-": print("Reading prefixes from STDIN...") return [ln.rstrip("\n") for ln in sys.stdin] p = Path(path_or_dash) if not p.exists(): raise FileNotFoundError(f"Input file not found: {path_or_dash}") text = p.read_text(encoding="utf-8") return text.splitlines() def aggregate_prefixes(lines): v4, v6, invalid = [], [], [] for lineno, ln in enumerate(lines, start=1): s = ln.strip() if not s or s.startswith("#"): continue try: net = ip_network(s, strict=False) if net.version == 4: v4.append(net) else: v6.append(net) except Exception as e: invalid.append((lineno, s, str(e))) agg_v4 = list(collapse_addresses(sorted(v4, key=lambda x: (int(x.network_address), x.prefixlen)))) agg_v6 = list(collapse_addresses(sorted(v6, key=lambda x: (int(x.network_address), x.prefixlen)))) return agg_v4, agg_v6, invalid def make_nft_config(agg_v4, agg_v6, comment=None): lines = [] lines.append("# Autogenerated nftables blacklist") lines.append(f"# Generated: {datetime.utcnow().isoformat()}Z") if comment: lines.append(f"# {comment}") lines.append(f"# IPv4: {len(agg_v4)}, IPv6: {len(agg_v6)}") lines.append("") lines.append("table inet filter {") lines.append("") # Define IPv4 blacklist set lines.append(" set blacklist_v4 {") lines.append(" type ipv4_addr") lines.append(" flags interval") if agg_v4: lines.append(" elements = {") for i, net in enumerate(agg_v4): comma = "," if i < len(agg_v4) - 1 else "" lines.append(f" {net.with_prefixlen}{comma}") lines.append(" }") lines.append(" }") lines.append("") # Define IPv6 blacklist set lines.append(" set blacklist_v6 {") lines.append(" type ipv6_addr") lines.append(" flags interval") if agg_v6: lines.append(" elements = {") for i, net in enumerate(agg_v6): comma = "," if i < len(agg_v6) - 1 else "" lines.append(f" {net.with_prefixlen}{comma}") lines.append(" }") lines.append(" }") lines.append("") # Define input chain with set lookups lines.append(" chain input {") lines.append(" type filter hook input priority 0;") lines.append(" policy accept;") lines.append("") lines.append(" ct state { established, related } accept") lines.append("") if agg_v4: lines.append(" ip saddr @blacklist_v4 counter drop") if agg_v6: lines.append(" ip6 saddr @blacklist_v6 counter drop") lines.append(" }") lines.append("}") return "\n".join(lines) def write_output(outpath, content): if outpath == "-": print(content) return p = Path(outpath) p.write_text(content, encoding="utf-8") p.chmod(0o644) print(f"Wrote nft config to: {p} (size: {p.stat().st_size} bytes)") def main(argv): if len(argv) < 3: print("Usage: python3 generate_nft_blacklist.py input.txt output.conf") print("Use '-' as input or output to mean STDIN/STDOUT respectively.") return 2 infile, outfile = argv[1], argv[2] try: lines = read_lines(infile) except Exception as e: print(f"ERROR reading input: {e}", file=sys.stderr) return 3 if not any(line.strip() and not line.strip().startswith("#") for line in lines): print("WARNING: input contains no prefixes (empty or only comments). Nothing to aggregate.") nft_conf = make_nft_config([], [], comment="Empty input produced no prefixes") write_output(outfile, nft_conf) return 0 agg_v4, agg_v6, invalid = aggregate_prefixes(lines) if invalid: print("Some lines could not be parsed (line, text, error):") for ln, txt, err in invalid: print(f" {ln}: '{txt}' --> {err}", file=sys.stderr) print(f"Aggregated IPv4 prefixes: {len(agg_v4)}") for n in agg_v4: print(" v4:", n) print(f"Aggregated IPv6 prefixes: {len(agg_v6)}") for n in agg_v6: print(" v6:", n) nft_conf = make_nft_config(agg_v4, agg_v6, comment=f"Source: {infile}") try: write_output(outfile, nft_conf) except Exception as e: print(f"ERROR writing output: {e}", file=sys.stderr) return 4 print("Done.") print("Load with: sudo nft -f ") print("View counters: sudo nft list chain inet filter input -a") print("View sets: sudo nft list set inet filter blacklist_v4") print(" sudo nft list set inet filter blacklist_v6") return 0 if __name__ == "__main__": sys.exit(main(sys.argv))